From e32debe1df7d0a837e30cd937fb7a18fc5cfa203 Mon Sep 17 00:00:00 2001 From: Robert Godfrey Date: Thu, 24 Apr 2008 17:49:03 +0000 Subject: QPID-832 : Fix eol-style git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@651325 13f79535-47bb-0310-9956-ffa450edef68 --- .../main/java/org/apache/qpid/ping/PingClient.java | 214 +- .../org/apache/qpid/ping/PingDurableClient.java | 904 ++--- .../org/apache/qpid/ping/PingLatencyTestPerf.java | 622 ++-- .../org/apache/qpid/ping/PingSendOnlyClient.java | 186 +- .../java/org/apache/qpid/ping/PingTestPerf.java | 392 +-- .../apache/qpid/requestreply/PingPongBouncer.java | 906 +++--- .../apache/qpid/requestreply/PingPongProducer.java | 3440 ++++++++++---------- .../apache/qpid/requestreply/PingPongTestPerf.java | 502 +-- .../qpid/test/testcases/MessageThroughputPerf.java | 398 +-- 9 files changed, 3782 insertions(+), 3782 deletions(-) (limited to 'qpid/java/perftests') diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingClient.java b/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingClient.java index b9632eee4c..0afec83b19 100644 --- a/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingClient.java +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingClient.java @@ -1,107 +1,107 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.ping; - -import org.apache.log4j.Logger; - -import org.apache.qpid.requestreply.PingPongProducer; - -import javax.jms.Destination; - -import java.util.List; -import java.util.Properties; - -/** - * PingClient is a {@link PingPongProducer} that does not need a {@link org.apache.qpid.requestreply.PingPongBouncer} - * to send replies to its pings. It simply listens to its own ping destinations, rather than seperate reply queues. - * It is an all in one ping client, that produces and consumes its own pings. - * - *

The constructor increments a count of the number of ping clients created. It is assumed that where many - * are created they will all be run in parallel and be active in sending and consuming pings at the same time. - * If the unique destinations flag is not set and a pub/sub ping cycle is being run, this means that they will all hear - * pings sent by each other. The expected number of pings received will therefore be multiplied up by the number of - * active ping clients. The {@link #getConsumersPerDestination()} method is used to supply this multiplier under these - * conditions. - * - *

- *
CRC Card
Responsibilities Collaborations - *
Create a ping producer that listens to its own pings {@link PingPongProducer} - *
Count the number of ping producers and produce multiplier for scaling up messages expected over topic pings. - *
- */ -public class PingClient extends PingPongProducer -{ - /** Used for debugging. */ - private final Logger log = Logger.getLogger(PingClient.class); - - /** Used to count the number of ping clients created. */ - private static int _pingClientCount; - - /** - * Creates a ping producer with the specified parameters, of which there are many. See the class level comments - * for {@link PingPongProducer} for details. This constructor creates a connection to the broker and creates - * producer and consumer sessions on it, to send and recieve its pings and replies on. - * - * @param overrides Properties containing any desired overrides to the defaults. - * - * @throws Exception Any exceptions are allowed to fall through. - */ - public PingClient(Properties overrides) throws Exception - { - super(overrides); - - _pingClientCount++; - } - - /** - * Returns the ping destinations themselves as the reply destinations for this pinger to listen to. This has the - * effect of making this pinger listen to its own pings. - * - * @return The ping destinations. - */ - public List getReplyDestinations() - { - return _pingDestinations; - } - - /** - * Supplies the multiplier for the number of ping clients that will hear each ping when doing pub/sub pinging. - * - * @return The scaling up of the number of expected pub/sub pings. - */ - public int getConsumersPerDestination() - { - log.debug("public int getConsumersPerDestination(): called"); - - if (_isUnique) - { - log.debug(_noOfConsumers + " consumer per destination."); - - return _noOfConsumers; - } - else - { - log.debug((_pingClientCount * _noOfConsumers) + " consumers per destination."); - - return _pingClientCount * _noOfConsumers; - } - } -} +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.ping; + +import org.apache.log4j.Logger; + +import org.apache.qpid.requestreply.PingPongProducer; + +import javax.jms.Destination; + +import java.util.List; +import java.util.Properties; + +/** + * PingClient is a {@link PingPongProducer} that does not need a {@link org.apache.qpid.requestreply.PingPongBouncer} + * to send replies to its pings. It simply listens to its own ping destinations, rather than seperate reply queues. + * It is an all in one ping client, that produces and consumes its own pings. + * + *

The constructor increments a count of the number of ping clients created. It is assumed that where many + * are created they will all be run in parallel and be active in sending and consuming pings at the same time. + * If the unique destinations flag is not set and a pub/sub ping cycle is being run, this means that they will all hear + * pings sent by each other. The expected number of pings received will therefore be multiplied up by the number of + * active ping clients. The {@link #getConsumersPerDestination()} method is used to supply this multiplier under these + * conditions. + * + *

+ *
CRC Card
Responsibilities Collaborations + *
Create a ping producer that listens to its own pings {@link PingPongProducer} + *
Count the number of ping producers and produce multiplier for scaling up messages expected over topic pings. + *
+ */ +public class PingClient extends PingPongProducer +{ + /** Used for debugging. */ + private final Logger log = Logger.getLogger(PingClient.class); + + /** Used to count the number of ping clients created. */ + private static int _pingClientCount; + + /** + * Creates a ping producer with the specified parameters, of which there are many. See the class level comments + * for {@link PingPongProducer} for details. This constructor creates a connection to the broker and creates + * producer and consumer sessions on it, to send and recieve its pings and replies on. + * + * @param overrides Properties containing any desired overrides to the defaults. + * + * @throws Exception Any exceptions are allowed to fall through. + */ + public PingClient(Properties overrides) throws Exception + { + super(overrides); + + _pingClientCount++; + } + + /** + * Returns the ping destinations themselves as the reply destinations for this pinger to listen to. This has the + * effect of making this pinger listen to its own pings. + * + * @return The ping destinations. + */ + public List getReplyDestinations() + { + return _pingDestinations; + } + + /** + * Supplies the multiplier for the number of ping clients that will hear each ping when doing pub/sub pinging. + * + * @return The scaling up of the number of expected pub/sub pings. + */ + public int getConsumersPerDestination() + { + log.debug("public int getConsumersPerDestination(): called"); + + if (_isUnique) + { + log.debug(_noOfConsumers + " consumer per destination."); + + return _noOfConsumers; + } + else + { + log.debug((_pingClientCount * _noOfConsumers) + " consumers per destination."); + + return _pingClientCount * _noOfConsumers; + } + } +} diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingDurableClient.java b/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingDurableClient.java index 0c8c19243a..a15897c82b 100644 --- a/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingDurableClient.java +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingDurableClient.java @@ -1,452 +1,452 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.ping; - -import org.apache.log4j.Logger; - -import org.apache.qpid.requestreply.PingPongProducer; -import org.apache.qpid.util.CommandLineParser; - -import org.apache.qpid.junit.extensions.util.MathUtils; -import org.apache.qpid.junit.extensions.util.ParsedProperties; - -import javax.jms.*; - -import java.io.BufferedReader; -import java.io.IOException; -import java.io.InputStreamReader; -import java.util.List; -import java.util.Properties; -import java.util.concurrent.atomic.AtomicInteger; - -/** - * PingDurableClient is a variation of the {@link PingPongProducer} ping tool. Instead of sending its pings and - * receiving replies to them at the same time, this tool sends pings until it is signalled by some 'event' to stop - * sending. It then waits for another signal before it re-opens a fresh connection and attempts to receive all of the - * pings that it has succesfully sent. It is intended to be an interactive test that lets a user experiment with - * failure conditions when using durable messaging. - * - *

The events that can stop it from sending are input from the user on the console, failure of its connection to - * the broker, completion of sending a specified number of messages, or expiry of a specified duration. In all cases - * it will do its best to clean up and close the connection before opening a fresh connection to receive the pings - * with. - * - *

The event to re-connect and attempt to recieve the pings is input from the user on the console. - * - *

This ping client inherits the configuration properties of its parent class ({@link PingPongProducer}) and - * additionally accepts the following parameters: - * - *

- *
Parameters
Parameter Default Comments - *
numMessages 100 The total number of messages to send. - *
numMessagesToAction -1 The number of messages to send before taking a custom 'action'. - *
duration 30S The length of time to ping for. (Format dDhHmMsS, for d days, h hours, - * m minutes and s seconds). - *
- * - *

This ping client also overrides some of the defaults of its parent class, to provide a reasonable set up - * when no parameters are specified. - * - *

- *
Parameters
Parameter Default Comments - *
uniqueDests false Prevents destination names being timestamped. - *
transacted true Only makes sense to test with transactions. - *
persistent true Only makes sense to test persistent. - *
durableDests true Should use durable queues with persistent messages. - *
commitBatchSize 10 - *
rate 20 Total default test time is 5 seconds. - *
- * - *

When a number of messages or duration is specified, this ping client will ping until the first of those limits - * is reached. Reaching the limit will be interpreted as the first signal to stop sending, and the ping client will - * wait for the second signal before receiving its pings. - * - *

This class provides a mechanism for extensions to add arbitrary actions, after a particular number of messages - * have been sent. When the number of messages equal the value set in the 'numMessagesToAction' property is method, - * the {@link #takeAction} method is called. By default this does nothing, but extensions of this class can provide - * custom behaviour with alternative implementations of this method (for example taking a backup). - * - *

- *
CRC Card
Responsibilities Collaborations - *
Send and receive pings. - *
Accept user input to signal stop sending. - *
Accept user input to signal start receiving. - *
Provide feedback on pings sent versus pings received. - *
Provide extension point for arbitrary action on a particular message count. - *
- */ -public class PingDurableClient extends PingPongProducer implements ExceptionListener -{ - private static final Logger log = Logger.getLogger(PingDurableClient.class); - - public static final String NUM_MESSAGES_PROPNAME = "numMessages"; - public static final String NUM_MESSAGES_DEFAULT = "100"; - public static final String DURATION_PROPNAME = "duration"; - public static final String DURATION_DEFAULT = "30S"; - public static final String NUM_MESSAGES_TO_ACTION_PROPNAME = "numMessagesToAction"; - public static final String NUM_MESSAGES_TO_ACTION_DEFAULT = "-1"; - - /** The maximum length of time to wait whilst receiving pings before assuming that no more are coming. */ - private static final long TIME_OUT = 3000; - - static - { - defaults.setProperty(NUM_MESSAGES_PROPNAME, NUM_MESSAGES_DEFAULT); - defaults.setProperty(DURATION_PROPNAME, DURATION_DEFAULT); - defaults.setProperty(UNIQUE_DESTS_PROPNAME, "false"); - defaults.setProperty(TRANSACTED_PROPNAME, "true"); - defaults.setProperty(PERSISTENT_MODE_PROPNAME, "true"); - defaults.setProperty(TX_BATCH_SIZE_PROPNAME, "10"); - defaults.setProperty(RATE_PROPNAME, "20"); - defaults.setProperty(DURABLE_DESTS_PROPNAME, "true"); - defaults.setProperty(NUM_MESSAGES_TO_ACTION_PROPNAME, NUM_MESSAGES_TO_ACTION_DEFAULT); - } - - /** Specifies the number of pings to send, if larger than 0. 0 means send until told to stop. */ - private int numMessages; - - /** Holds the number of messages to send before taking triggering the action. */ - private int numMessagesToAction; - - /** Sepcifies how long to ping for, if larger than 0. 0 means send until told to stop. */ - private long duration; - - /** Used to indciate that this application should terminate. Set by the shutdown hook. */ - private boolean terminate = false; - - /** - * @throws Exception Any exceptions are allowed to fall through. - */ - public PingDurableClient(Properties overrides) throws Exception - { - super(overrides); - log.debug("public PingDurableClient(Properties overrides = " + overrides + "): called"); - - // Extract the additional configuration parameters. - ParsedProperties properties = new ParsedProperties(defaults); - properties.putAll(overrides); - - numMessages = properties.getPropertyAsInteger(NUM_MESSAGES_PROPNAME); - String durationSpec = properties.getProperty(DURATION_PROPNAME); - numMessagesToAction = properties.getPropertyAsInteger(NUM_MESSAGES_TO_ACTION_PROPNAME); - - if (durationSpec != null) - { - duration = MathUtils.parseDuration(durationSpec) * 1000000; - } - } - - /** - * Starts the ping/wait/receive process. - * - * @param args The command line arguments. - */ - public static void main(String[] args) - { - try - { - // Create a ping producer overriding its defaults with all options passed on the command line. - Properties options = - CommandLineParser.processCommandLine(args, new CommandLineParser(new String[][] {}), System.getProperties()); - PingDurableClient pingProducer = new PingDurableClient(options); - - // Create a shutdown hook to terminate the ping-pong producer. - Runtime.getRuntime().addShutdownHook(pingProducer.getShutdownHook()); - - // Ensure that the ping pong producer is registered to listen for exceptions on the connection too. - // pingProducer.getConnection().setExceptionListener(pingProducer); - - // Run the test procedure. - int sent = pingProducer.send(); - pingProducer.closeConnection(); - pingProducer.waitForUser("Press return to begin receiving the pings."); - pingProducer.receive(sent); - - System.exit(0); - } - catch (Exception e) - { - System.err.println(e.getMessage()); - log.error("Top level handler caught execption.", e); - System.exit(1); - } - } - - /** - * Performs the main test procedure implemented by this ping client. See the class level comment for details. - */ - protected int send() throws Exception - { - log.debug("public void sendWaitReceive(): called"); - - log.debug("duration = " + duration); - log.debug("numMessages = " + numMessages); - - if (duration > 0) - { - System.out.println("Sending for up to " + (duration / 1000000000f) + " seconds."); - } - - if (_rate > 0) - { - System.out.println("Sending at " + _rate + " messages per second."); - } - - if (numMessages > 0) - { - System.out.println("Sending up to " + numMessages + " messages."); - } - - // Establish the connection and the message producer. - establishConnection(true, false); - _connection.start(); - - Message message = getTestMessage(getReplyDestinations().get(0), _messageSize, _persistent); - - // Send pings until a terminating condition is received. - boolean endCondition = false; - int messagesSent = 0; - int messagesCommitted = 0; - int messagesNotCommitted = 0; - long start = System.nanoTime(); - - // Clear console in. - clearConsole(); - - while (!endCondition) - { - boolean committed = false; - - try - { - committed = sendMessage(messagesSent, message) && _transacted; - - messagesSent++; - messagesNotCommitted++; - - // Keep count of the number of messsages currently committed and pending commit. - if (committed) - { - log.debug("Adding " + messagesNotCommitted + " messages to the committed count."); - messagesCommitted += messagesNotCommitted; - messagesNotCommitted = 0; - - System.out.println("Commited: " + messagesCommitted); - } - } - catch (JMSException e) - { - log.debug("Got JMSException whilst sending."); - _publish = false; - } - - // Perform the arbitrary action if the number of messages sent has reached the right number. - if (messagesSent == numMessagesToAction) - { - System.out.println("At action point, Messages sent = " + messagesSent + ", Messages Committed = " - + messagesCommitted + ", Messages not Committed = " + messagesNotCommitted); - takeAction(); - } - - // Determine if the end condition has been met, based on the number of messages, time passed, errors on - // the connection or user input. - long now = System.nanoTime(); - - if ((duration != 0) && ((now - start) > duration)) - { - System.out.println("Send halted because duration expired."); - endCondition = true; - } - else if ((numMessages != 0) && (messagesSent >= numMessages)) - { - System.out.println("Send halted because # messages completed."); - endCondition = true; - } - else if (System.in.available() > 0) - { - System.out.println("Send halted by user input."); - endCondition = true; - - clearConsole(); - } - else if (!_publish) - { - System.out.println("Send halted by error on the connection."); - endCondition = true; - } - } - - log.debug("messagesSent = " + messagesSent); - log.debug("messagesCommitted = " + messagesCommitted); - log.debug("messagesNotCommitted = " + messagesNotCommitted); - - System.out.println("Messages sent: " + messagesSent + ", Messages Committed = " + messagesCommitted - + ", Messages not Committed = " + messagesNotCommitted); - - return messagesSent; - } - - protected void closeConnection() - { - // Clean up the connection. - try - { - close(); - } - catch (JMSException e) - { - log.debug("There was an error whilst closing the connection: " + e, e); - System.out.println("There was an error whilst closing the connection."); - - // Ignore as did best could manage to clean up. - } - } - - protected void receive(int messagesSent) throws Exception - { - // Re-establish the connection and the message consumer. - _queueJVMSequenceID = new AtomicInteger(); - _queueSharedID = new AtomicInteger(); - - establishConnection(false, true); - _consumer[0].setMessageListener(null); - _consumerConnection[0].start(); - - // Try to receive all of the pings that were successfully sent. - int messagesReceived = 0; - boolean endCondition = false; - - while (!endCondition) - { - // Message received = _consumer.receiveNoWait(); - Message received = _consumer[0].receive(TIME_OUT); - log.debug("received = " + received); - - if (received != null) - { - messagesReceived++; - } - - // Determine if the end condition has been met, based on the number of messages and time passed since last - // receiving a message. - if (received == null) - { - System.out.println("Timed out."); - endCondition = true; - } - else if (messagesReceived >= messagesSent) - { - System.out.println("Got all messages."); - endCondition = true; - } - } - - // Ensure messages received are committed. - if (_consTransacted) - { - try - { - _consumerSession[0].commit(); - System.out.println("Committed for all messages received."); - } - catch (JMSException e) - { - log.debug("Error during commit: " + e, e); - System.out.println("Error during commit."); - try - { - _consumerSession[0].rollback(); - System.out.println("Rolled back on all messages received."); - } - catch (JMSException e2) - { - log.debug("Error during rollback: " + e, e); - System.out.println("Error on roll back of all messages received."); - } - - } - } - - log.debug("messagesReceived = " + messagesReceived); - - System.out.println("Messages received: " + messagesReceived); - - // Clean up the connection. - close(); - } - - /** - * Clears any pending input from the console. - */ - private void clearConsole() - { - try - { - BufferedReader bis = new BufferedReader(new InputStreamReader(System.in)); - - // System.in.skip(System.in.available()); - while (bis.ready()) - { - bis.readLine(); - } - } - catch (IOException e) - { } - } - - /** - * Returns the ping destinations themselves as the reply destinations for this pinger to listen to. This has the - * effect of making this pinger listen to its own pings. - * - * @return The ping destinations. - */ - public List getReplyDestinations() - { - return _pingDestinations; - } - - /** - * Gets a shutdown hook that will cleanly shut this down when it is running the ping loop. This can be registered with - * the runtime system as a shutdown hook. This shutdown hook sets an additional terminate flag, compared with the - * shutdown hook in {@link PingPongProducer}, because the publish flag is used to indicate that sending or receiving - * message should stop, not that the application should termiante. - * - * @return A shutdown hook for the ping loop. - */ - public Thread getShutdownHook() - { - return new Thread(new Runnable() - { - public void run() - { - stop(); - terminate = true; - } - }); - } - - /** - * Performs an aribtrary action once the 'numMesagesToAction' count is reached on sending messages. This default - * implementation does nothing. - */ - public void takeAction() - { } -} +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.ping; + +import org.apache.log4j.Logger; + +import org.apache.qpid.requestreply.PingPongProducer; +import org.apache.qpid.util.CommandLineParser; + +import org.apache.qpid.junit.extensions.util.MathUtils; +import org.apache.qpid.junit.extensions.util.ParsedProperties; + +import javax.jms.*; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * PingDurableClient is a variation of the {@link PingPongProducer} ping tool. Instead of sending its pings and + * receiving replies to them at the same time, this tool sends pings until it is signalled by some 'event' to stop + * sending. It then waits for another signal before it re-opens a fresh connection and attempts to receive all of the + * pings that it has succesfully sent. It is intended to be an interactive test that lets a user experiment with + * failure conditions when using durable messaging. + * + *

The events that can stop it from sending are input from the user on the console, failure of its connection to + * the broker, completion of sending a specified number of messages, or expiry of a specified duration. In all cases + * it will do its best to clean up and close the connection before opening a fresh connection to receive the pings + * with. + * + *

The event to re-connect and attempt to recieve the pings is input from the user on the console. + * + *

This ping client inherits the configuration properties of its parent class ({@link PingPongProducer}) and + * additionally accepts the following parameters: + * + *

+ *
Parameters
Parameter Default Comments + *
numMessages 100 The total number of messages to send. + *
numMessagesToAction -1 The number of messages to send before taking a custom 'action'. + *
duration 30S The length of time to ping for. (Format dDhHmMsS, for d days, h hours, + * m minutes and s seconds). + *
+ * + *

This ping client also overrides some of the defaults of its parent class, to provide a reasonable set up + * when no parameters are specified. + * + *

+ *
Parameters
Parameter Default Comments + *
uniqueDests false Prevents destination names being timestamped. + *
transacted true Only makes sense to test with transactions. + *
persistent true Only makes sense to test persistent. + *
durableDests true Should use durable queues with persistent messages. + *
commitBatchSize 10 + *
rate 20 Total default test time is 5 seconds. + *
+ * + *

When a number of messages or duration is specified, this ping client will ping until the first of those limits + * is reached. Reaching the limit will be interpreted as the first signal to stop sending, and the ping client will + * wait for the second signal before receiving its pings. + * + *

This class provides a mechanism for extensions to add arbitrary actions, after a particular number of messages + * have been sent. When the number of messages equal the value set in the 'numMessagesToAction' property is method, + * the {@link #takeAction} method is called. By default this does nothing, but extensions of this class can provide + * custom behaviour with alternative implementations of this method (for example taking a backup). + * + *

+ *
CRC Card
Responsibilities Collaborations + *
Send and receive pings. + *
Accept user input to signal stop sending. + *
Accept user input to signal start receiving. + *
Provide feedback on pings sent versus pings received. + *
Provide extension point for arbitrary action on a particular message count. + *
+ */ +public class PingDurableClient extends PingPongProducer implements ExceptionListener +{ + private static final Logger log = Logger.getLogger(PingDurableClient.class); + + public static final String NUM_MESSAGES_PROPNAME = "numMessages"; + public static final String NUM_MESSAGES_DEFAULT = "100"; + public static final String DURATION_PROPNAME = "duration"; + public static final String DURATION_DEFAULT = "30S"; + public static final String NUM_MESSAGES_TO_ACTION_PROPNAME = "numMessagesToAction"; + public static final String NUM_MESSAGES_TO_ACTION_DEFAULT = "-1"; + + /** The maximum length of time to wait whilst receiving pings before assuming that no more are coming. */ + private static final long TIME_OUT = 3000; + + static + { + defaults.setProperty(NUM_MESSAGES_PROPNAME, NUM_MESSAGES_DEFAULT); + defaults.setProperty(DURATION_PROPNAME, DURATION_DEFAULT); + defaults.setProperty(UNIQUE_DESTS_PROPNAME, "false"); + defaults.setProperty(TRANSACTED_PROPNAME, "true"); + defaults.setProperty(PERSISTENT_MODE_PROPNAME, "true"); + defaults.setProperty(TX_BATCH_SIZE_PROPNAME, "10"); + defaults.setProperty(RATE_PROPNAME, "20"); + defaults.setProperty(DURABLE_DESTS_PROPNAME, "true"); + defaults.setProperty(NUM_MESSAGES_TO_ACTION_PROPNAME, NUM_MESSAGES_TO_ACTION_DEFAULT); + } + + /** Specifies the number of pings to send, if larger than 0. 0 means send until told to stop. */ + private int numMessages; + + /** Holds the number of messages to send before taking triggering the action. */ + private int numMessagesToAction; + + /** Sepcifies how long to ping for, if larger than 0. 0 means send until told to stop. */ + private long duration; + + /** Used to indciate that this application should terminate. Set by the shutdown hook. */ + private boolean terminate = false; + + /** + * @throws Exception Any exceptions are allowed to fall through. + */ + public PingDurableClient(Properties overrides) throws Exception + { + super(overrides); + log.debug("public PingDurableClient(Properties overrides = " + overrides + "): called"); + + // Extract the additional configuration parameters. + ParsedProperties properties = new ParsedProperties(defaults); + properties.putAll(overrides); + + numMessages = properties.getPropertyAsInteger(NUM_MESSAGES_PROPNAME); + String durationSpec = properties.getProperty(DURATION_PROPNAME); + numMessagesToAction = properties.getPropertyAsInteger(NUM_MESSAGES_TO_ACTION_PROPNAME); + + if (durationSpec != null) + { + duration = MathUtils.parseDuration(durationSpec) * 1000000; + } + } + + /** + * Starts the ping/wait/receive process. + * + * @param args The command line arguments. + */ + public static void main(String[] args) + { + try + { + // Create a ping producer overriding its defaults with all options passed on the command line. + Properties options = + CommandLineParser.processCommandLine(args, new CommandLineParser(new String[][] {}), System.getProperties()); + PingDurableClient pingProducer = new PingDurableClient(options); + + // Create a shutdown hook to terminate the ping-pong producer. + Runtime.getRuntime().addShutdownHook(pingProducer.getShutdownHook()); + + // Ensure that the ping pong producer is registered to listen for exceptions on the connection too. + // pingProducer.getConnection().setExceptionListener(pingProducer); + + // Run the test procedure. + int sent = pingProducer.send(); + pingProducer.closeConnection(); + pingProducer.waitForUser("Press return to begin receiving the pings."); + pingProducer.receive(sent); + + System.exit(0); + } + catch (Exception e) + { + System.err.println(e.getMessage()); + log.error("Top level handler caught execption.", e); + System.exit(1); + } + } + + /** + * Performs the main test procedure implemented by this ping client. See the class level comment for details. + */ + protected int send() throws Exception + { + log.debug("public void sendWaitReceive(): called"); + + log.debug("duration = " + duration); + log.debug("numMessages = " + numMessages); + + if (duration > 0) + { + System.out.println("Sending for up to " + (duration / 1000000000f) + " seconds."); + } + + if (_rate > 0) + { + System.out.println("Sending at " + _rate + " messages per second."); + } + + if (numMessages > 0) + { + System.out.println("Sending up to " + numMessages + " messages."); + } + + // Establish the connection and the message producer. + establishConnection(true, false); + _connection.start(); + + Message message = getTestMessage(getReplyDestinations().get(0), _messageSize, _persistent); + + // Send pings until a terminating condition is received. + boolean endCondition = false; + int messagesSent = 0; + int messagesCommitted = 0; + int messagesNotCommitted = 0; + long start = System.nanoTime(); + + // Clear console in. + clearConsole(); + + while (!endCondition) + { + boolean committed = false; + + try + { + committed = sendMessage(messagesSent, message) && _transacted; + + messagesSent++; + messagesNotCommitted++; + + // Keep count of the number of messsages currently committed and pending commit. + if (committed) + { + log.debug("Adding " + messagesNotCommitted + " messages to the committed count."); + messagesCommitted += messagesNotCommitted; + messagesNotCommitted = 0; + + System.out.println("Commited: " + messagesCommitted); + } + } + catch (JMSException e) + { + log.debug("Got JMSException whilst sending."); + _publish = false; + } + + // Perform the arbitrary action if the number of messages sent has reached the right number. + if (messagesSent == numMessagesToAction) + { + System.out.println("At action point, Messages sent = " + messagesSent + ", Messages Committed = " + + messagesCommitted + ", Messages not Committed = " + messagesNotCommitted); + takeAction(); + } + + // Determine if the end condition has been met, based on the number of messages, time passed, errors on + // the connection or user input. + long now = System.nanoTime(); + + if ((duration != 0) && ((now - start) > duration)) + { + System.out.println("Send halted because duration expired."); + endCondition = true; + } + else if ((numMessages != 0) && (messagesSent >= numMessages)) + { + System.out.println("Send halted because # messages completed."); + endCondition = true; + } + else if (System.in.available() > 0) + { + System.out.println("Send halted by user input."); + endCondition = true; + + clearConsole(); + } + else if (!_publish) + { + System.out.println("Send halted by error on the connection."); + endCondition = true; + } + } + + log.debug("messagesSent = " + messagesSent); + log.debug("messagesCommitted = " + messagesCommitted); + log.debug("messagesNotCommitted = " + messagesNotCommitted); + + System.out.println("Messages sent: " + messagesSent + ", Messages Committed = " + messagesCommitted + + ", Messages not Committed = " + messagesNotCommitted); + + return messagesSent; + } + + protected void closeConnection() + { + // Clean up the connection. + try + { + close(); + } + catch (JMSException e) + { + log.debug("There was an error whilst closing the connection: " + e, e); + System.out.println("There was an error whilst closing the connection."); + + // Ignore as did best could manage to clean up. + } + } + + protected void receive(int messagesSent) throws Exception + { + // Re-establish the connection and the message consumer. + _queueJVMSequenceID = new AtomicInteger(); + _queueSharedID = new AtomicInteger(); + + establishConnection(false, true); + _consumer[0].setMessageListener(null); + _consumerConnection[0].start(); + + // Try to receive all of the pings that were successfully sent. + int messagesReceived = 0; + boolean endCondition = false; + + while (!endCondition) + { + // Message received = _consumer.receiveNoWait(); + Message received = _consumer[0].receive(TIME_OUT); + log.debug("received = " + received); + + if (received != null) + { + messagesReceived++; + } + + // Determine if the end condition has been met, based on the number of messages and time passed since last + // receiving a message. + if (received == null) + { + System.out.println("Timed out."); + endCondition = true; + } + else if (messagesReceived >= messagesSent) + { + System.out.println("Got all messages."); + endCondition = true; + } + } + + // Ensure messages received are committed. + if (_consTransacted) + { + try + { + _consumerSession[0].commit(); + System.out.println("Committed for all messages received."); + } + catch (JMSException e) + { + log.debug("Error during commit: " + e, e); + System.out.println("Error during commit."); + try + { + _consumerSession[0].rollback(); + System.out.println("Rolled back on all messages received."); + } + catch (JMSException e2) + { + log.debug("Error during rollback: " + e, e); + System.out.println("Error on roll back of all messages received."); + } + + } + } + + log.debug("messagesReceived = " + messagesReceived); + + System.out.println("Messages received: " + messagesReceived); + + // Clean up the connection. + close(); + } + + /** + * Clears any pending input from the console. + */ + private void clearConsole() + { + try + { + BufferedReader bis = new BufferedReader(new InputStreamReader(System.in)); + + // System.in.skip(System.in.available()); + while (bis.ready()) + { + bis.readLine(); + } + } + catch (IOException e) + { } + } + + /** + * Returns the ping destinations themselves as the reply destinations for this pinger to listen to. This has the + * effect of making this pinger listen to its own pings. + * + * @return The ping destinations. + */ + public List getReplyDestinations() + { + return _pingDestinations; + } + + /** + * Gets a shutdown hook that will cleanly shut this down when it is running the ping loop. This can be registered with + * the runtime system as a shutdown hook. This shutdown hook sets an additional terminate flag, compared with the + * shutdown hook in {@link PingPongProducer}, because the publish flag is used to indicate that sending or receiving + * message should stop, not that the application should termiante. + * + * @return A shutdown hook for the ping loop. + */ + public Thread getShutdownHook() + { + return new Thread(new Runnable() + { + public void run() + { + stop(); + terminate = true; + } + }); + } + + /** + * Performs an aribtrary action once the 'numMesagesToAction' count is reached on sending messages. This default + * implementation does nothing. + */ + public void takeAction() + { } +} diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingLatencyTestPerf.java b/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingLatencyTestPerf.java index 215dbcefa3..5ba4004c56 100644 --- a/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingLatencyTestPerf.java +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingLatencyTestPerf.java @@ -1,311 +1,311 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.ping; - -import junit.framework.Test; -import junit.framework.TestSuite; - -import org.apache.log4j.Logger; - -import org.apache.qpid.client.AMQSession; -import org.apache.qpid.requestreply.PingPongProducer; - -import org.apache.qpid.junit.extensions.TimingController; -import org.apache.qpid.junit.extensions.TimingControllerAware; -import org.apache.qpid.junit.extensions.util.ParsedProperties; - -import javax.jms.JMSException; -import javax.jms.Message; - -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.atomic.AtomicLong; - -/** - * PingLatencyTestPerf is a performance test that outputs multiple timings from its test method, using the timing - * controller interface supplied by the test runner from a seperate listener thread. It outputs round trip timings for - * individual ping messages rather than for how long a complete batch of messages took to process. It also differs from - * the {@link PingTestPerf} test that it extends because it can output timings as replies are received, rather than - * waiting until all expected replies are received. - * - *

This test does not output timings for every single ping message, as when running at high volume, writing the test - * log for a vast number of messages would slow the testing down. Instead samples ping latency occasionally. The - * frequency of ping sampling is set using the {@link #TEST_RESULTS_BATCH_SIZE_PROPNAME} property, to override the - * default of every {@link #DEFAULT_TEST_RESULTS_BATCH_SIZE}. - * - *

The size parameter logged for each individual ping is set to the size of the batch of messages that the - * individual timed ping was taken from, rather than 1 for a single message. This is so that the total throughput - * (messages / time) can be calculated in order to examine the relationship between throughput and latency. - * - *

CRC Card
Responsibilities Collaborations
Send many ping - * messages and output timings for sampled individual pings.
- */ -public class PingLatencyTestPerf extends PingTestPerf implements TimingControllerAware -{ - private static Logger _logger = Logger.getLogger(PingLatencyTestPerf.class); - - /** Holds the name of the property to get the test results logging batch size. */ - public static final String TEST_RESULTS_BATCH_SIZE_PROPNAME = "batchSize"; - - /** Holds the default test results logging batch size. */ - public static final int DEFAULT_TEST_RESULTS_BATCH_SIZE = 1000; - - /** Used to hold the timing controller passed from the test runner. */ - private TimingController _timingController; - - /** Used to generate unique correlation ids for each test run. */ - private AtomicLong corellationIdGenerator = new AtomicLong(); - - /** - * Holds test specifics by correlation id. This consists of the expected number of messages and the timing - * controler. - */ - private Map perCorrelationIds = - Collections.synchronizedMap(new HashMap()); - - /** Holds the batched results listener, that does logging on batch boundaries. */ - private BatchedResultsListener batchedResultsListener = null; - - /** - * Creates a new asynchronous ping performance test with the specified name. - * - * @param name The test name. - */ - public PingLatencyTestPerf(String name) - { - super(name); - - // Sets up the test parameters with defaults. - ParsedProperties.setSysPropertyIfNull(TEST_RESULTS_BATCH_SIZE_PROPNAME, - Integer.toString(DEFAULT_TEST_RESULTS_BATCH_SIZE)); - } - - /** Compile all the tests into a test suite. */ - public static Test suite() - { - // Build a new test suite - TestSuite suite = new TestSuite("Ping Latency Tests"); - - // Run performance tests in read committed mode. - suite.addTest(new PingLatencyTestPerf("testPingLatency")); - - return suite; - } - - /** - * Accepts a timing controller from the test runner. - * - * @param timingController The timing controller to register mutliple timings with. - */ - public void setTimingController(TimingController timingController) - { - _timingController = timingController; - } - - /** - * Gets the timing controller passed in by the test runner. - * - * @return The timing controller passed in by the test runner. - */ - public TimingController getTimingController() - { - return _timingController; - } - - /** - * Sends the specified number of pings, asynchronously outputs timings on every batch boundary, and waits until all - * replies have been received or a time out occurs before exiting this method. - * - * @param numPings The number of pings to send. - */ - public void testPingLatency(int numPings) throws Exception - { - _logger.debug("public void testPingLatency(int numPings): called"); - - // Ensure that at least one ping was requeusted. - if (numPings == 0) - { - _logger.error("Number of pings requested was zero."); - } - - // Get the per thread test setup to run the test through. - PerThreadSetup perThreadSetup = threadSetup.get(); - PingClient pingClient = perThreadSetup._pingClient; - - // Advance the correlation id of messages to send, to make it unique for this run. - String messageCorrelationId = Long.toString(corellationIdGenerator.incrementAndGet()); - _logger.debug("messageCorrelationId = " + messageCorrelationId); - - // Initialize the count and timing controller for the new correlation id. - PerCorrelationId perCorrelationId = new PerCorrelationId(); - TimingController tc = getTimingController().getControllerForCurrentThread(); - perCorrelationId._tc = tc; - perCorrelationId._expectedCount = numPings; - perCorrelationIds.put(messageCorrelationId, perCorrelationId); - - // Attach the chained message listener to the ping producer to listen asynchronously for the replies to these - // messages. - pingClient.setChainedMessageListener(batchedResultsListener); - - // Generate a sample message of the specified size. - Message msg = - pingClient.getTestMessage(perThreadSetup._pingClient.getReplyDestinations().get(0), - testParameters.getPropertyAsInteger(PingPongProducer.MESSAGE_SIZE_PROPNAME), - testParameters.getPropertyAsBoolean(PingPongProducer.PERSISTENT_MODE_PROPNAME)); - - // Send the requested number of messages, and wait until they have all been received. - long timeout = Long.parseLong(testParameters.getProperty(PingPongProducer.TIMEOUT_PROPNAME)); - int numReplies = pingClient.pingAndWaitForReply(msg, numPings, timeout, null); - - // Check that all the replies were received and log a fail if they were not. - if (numReplies < numPings) - { - tc.completeTest(false, 0); - } - - // Remove the chained message listener from the ping producer. - pingClient.removeChainedMessageListener(); - - // Remove the expected count and timing controller for the message correlation id, to ensure they are cleaned up. - perCorrelationIds.remove(messageCorrelationId); - } - - /** Performs test fixture creation on a per thread basis. This will only be called once for each test thread. */ - public void threadSetUp() - { - _logger.debug("public void threadSetUp(): called"); - - try - { - // Call the set up method in the super class. This creates a PingClient pinger. - super.threadSetUp(); - - // Create the chained message listener, only if it has not already been created. This is set up with the - // batch size property, to tell it what batch size to output results on. A synchronized block is used to - // ensure that only one thread creates this. - synchronized (this) - { - if (batchedResultsListener == null) - { - int batchSize = Integer.parseInt(testParameters.getProperty(TEST_RESULTS_BATCH_SIZE_PROPNAME)); - batchedResultsListener = new BatchedResultsListener(batchSize); - } - } - - // Get the set up that the super class created. - PerThreadSetup perThreadSetup = threadSetup.get(); - - // Register the chained message listener on the pinger to do its asynchronous test timings from. - perThreadSetup._pingClient.setChainedMessageListener(batchedResultsListener); - } - catch (Exception e) - { - _logger.warn("There was an exception during per thread setup.", e); - } - } - - /** - * BatchedResultsListener is a {@link org.apache.qpid.requestreply.PingPongProducer.ChainedMessageListener} that can - * be attached to the pinger, in order to receive notifications about every message received and the number - * remaining to be received. Whenever the number remaining crosses a batch size boundary this results listener - * outputs a test timing for the actual number of messages received in the current batch. - */ - private class BatchedResultsListener implements PingPongProducer.ChainedMessageListener - { - /** The test results logging batch size. */ - int _batchSize; - private boolean _strictAMQP; - - /** - * Creates a results listener on the specified batch size. - * - * @param batchSize The batch size to use. - */ - public BatchedResultsListener(int batchSize) - { - _batchSize = batchSize; - _strictAMQP = - Boolean.parseBoolean(System.getProperties().getProperty(AMQSession.STRICT_AMQP, - AMQSession.STRICT_AMQP_DEFAULT)); - } - - /** - * This callback method is called from all of the pingers that this test creates. It uses the correlation id - * from the message to identify the timing controller for the test thread that was responsible for sending those - * messages. - * - * @param message The message. - * @param remainingCount The count of messages remaining to be received with a particular correlation id. - * - * @throws javax.jms.JMSException Any underlying JMSException is allowed to fall through. - */ - public void onMessage(Message message, int remainingCount, long latency) throws JMSException - { - _logger.debug("public void onMessage(Message message, int remainingCount = " + remainingCount + "): called"); - - // Check if a batch boundary has been crossed. - if ((remainingCount % _batchSize) == 0) - { - // Extract the correlation id from the message. - String correlationId = message.getJMSCorrelationID(); - - // Get the details for the correlation id and check that they are not null. They can become null - // if a test times out. - PerCorrelationId perCorrelationId = perCorrelationIds.get(correlationId); - if (perCorrelationId != null) - { - // Get the timing controller and expected count for this correlation id. - TimingController tc = perCorrelationId._tc; - int expected = perCorrelationId._expectedCount; - - // Calculate how many messages were actually received in the last batch. This will be the batch size - // except where the number expected is not a multiple of the batch size and this is the first remaining - // count to cross a batch size boundary, in which case it will be the number expected modulo the batch - // size. - int receivedInBatch = ((expected - remainingCount) < _batchSize) ? (expected % _batchSize) : _batchSize; - - // Register a test result for the correlation id. - try - { - tc.completeTest(true, receivedInBatch, latency); - } - catch (InterruptedException e) - { - // Ignore this. It means the test runner wants to stop as soon as possible. - _logger.warn("Got InterruptedException.", e); - } - } - // Else ignore, test timed out. Should log a fail here? - } - } - } - - /** - * Holds state specific to each correlation id, needed to output test results. This consists of the count of the - * total expected number of messages, and the timing controller for the thread sending those message ids. - */ - private static class PerCorrelationId - { - public int _expectedCount; - public TimingController _tc; - } -} +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.ping; + +import junit.framework.Test; +import junit.framework.TestSuite; + +import org.apache.log4j.Logger; + +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.requestreply.PingPongProducer; + +import org.apache.qpid.junit.extensions.TimingController; +import org.apache.qpid.junit.extensions.TimingControllerAware; +import org.apache.qpid.junit.extensions.util.ParsedProperties; + +import javax.jms.JMSException; +import javax.jms.Message; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; + +/** + * PingLatencyTestPerf is a performance test that outputs multiple timings from its test method, using the timing + * controller interface supplied by the test runner from a seperate listener thread. It outputs round trip timings for + * individual ping messages rather than for how long a complete batch of messages took to process. It also differs from + * the {@link PingTestPerf} test that it extends because it can output timings as replies are received, rather than + * waiting until all expected replies are received. + * + *

This test does not output timings for every single ping message, as when running at high volume, writing the test + * log for a vast number of messages would slow the testing down. Instead samples ping latency occasionally. The + * frequency of ping sampling is set using the {@link #TEST_RESULTS_BATCH_SIZE_PROPNAME} property, to override the + * default of every {@link #DEFAULT_TEST_RESULTS_BATCH_SIZE}. + * + *

The size parameter logged for each individual ping is set to the size of the batch of messages that the + * individual timed ping was taken from, rather than 1 for a single message. This is so that the total throughput + * (messages / time) can be calculated in order to examine the relationship between throughput and latency. + * + *

CRC Card
Responsibilities Collaborations
Send many ping + * messages and output timings for sampled individual pings.
+ */ +public class PingLatencyTestPerf extends PingTestPerf implements TimingControllerAware +{ + private static Logger _logger = Logger.getLogger(PingLatencyTestPerf.class); + + /** Holds the name of the property to get the test results logging batch size. */ + public static final String TEST_RESULTS_BATCH_SIZE_PROPNAME = "batchSize"; + + /** Holds the default test results logging batch size. */ + public static final int DEFAULT_TEST_RESULTS_BATCH_SIZE = 1000; + + /** Used to hold the timing controller passed from the test runner. */ + private TimingController _timingController; + + /** Used to generate unique correlation ids for each test run. */ + private AtomicLong corellationIdGenerator = new AtomicLong(); + + /** + * Holds test specifics by correlation id. This consists of the expected number of messages and the timing + * controler. + */ + private Map perCorrelationIds = + Collections.synchronizedMap(new HashMap()); + + /** Holds the batched results listener, that does logging on batch boundaries. */ + private BatchedResultsListener batchedResultsListener = null; + + /** + * Creates a new asynchronous ping performance test with the specified name. + * + * @param name The test name. + */ + public PingLatencyTestPerf(String name) + { + super(name); + + // Sets up the test parameters with defaults. + ParsedProperties.setSysPropertyIfNull(TEST_RESULTS_BATCH_SIZE_PROPNAME, + Integer.toString(DEFAULT_TEST_RESULTS_BATCH_SIZE)); + } + + /** Compile all the tests into a test suite. */ + public static Test suite() + { + // Build a new test suite + TestSuite suite = new TestSuite("Ping Latency Tests"); + + // Run performance tests in read committed mode. + suite.addTest(new PingLatencyTestPerf("testPingLatency")); + + return suite; + } + + /** + * Accepts a timing controller from the test runner. + * + * @param timingController The timing controller to register mutliple timings with. + */ + public void setTimingController(TimingController timingController) + { + _timingController = timingController; + } + + /** + * Gets the timing controller passed in by the test runner. + * + * @return The timing controller passed in by the test runner. + */ + public TimingController getTimingController() + { + return _timingController; + } + + /** + * Sends the specified number of pings, asynchronously outputs timings on every batch boundary, and waits until all + * replies have been received or a time out occurs before exiting this method. + * + * @param numPings The number of pings to send. + */ + public void testPingLatency(int numPings) throws Exception + { + _logger.debug("public void testPingLatency(int numPings): called"); + + // Ensure that at least one ping was requeusted. + if (numPings == 0) + { + _logger.error("Number of pings requested was zero."); + } + + // Get the per thread test setup to run the test through. + PerThreadSetup perThreadSetup = threadSetup.get(); + PingClient pingClient = perThreadSetup._pingClient; + + // Advance the correlation id of messages to send, to make it unique for this run. + String messageCorrelationId = Long.toString(corellationIdGenerator.incrementAndGet()); + _logger.debug("messageCorrelationId = " + messageCorrelationId); + + // Initialize the count and timing controller for the new correlation id. + PerCorrelationId perCorrelationId = new PerCorrelationId(); + TimingController tc = getTimingController().getControllerForCurrentThread(); + perCorrelationId._tc = tc; + perCorrelationId._expectedCount = numPings; + perCorrelationIds.put(messageCorrelationId, perCorrelationId); + + // Attach the chained message listener to the ping producer to listen asynchronously for the replies to these + // messages. + pingClient.setChainedMessageListener(batchedResultsListener); + + // Generate a sample message of the specified size. + Message msg = + pingClient.getTestMessage(perThreadSetup._pingClient.getReplyDestinations().get(0), + testParameters.getPropertyAsInteger(PingPongProducer.MESSAGE_SIZE_PROPNAME), + testParameters.getPropertyAsBoolean(PingPongProducer.PERSISTENT_MODE_PROPNAME)); + + // Send the requested number of messages, and wait until they have all been received. + long timeout = Long.parseLong(testParameters.getProperty(PingPongProducer.TIMEOUT_PROPNAME)); + int numReplies = pingClient.pingAndWaitForReply(msg, numPings, timeout, null); + + // Check that all the replies were received and log a fail if they were not. + if (numReplies < numPings) + { + tc.completeTest(false, 0); + } + + // Remove the chained message listener from the ping producer. + pingClient.removeChainedMessageListener(); + + // Remove the expected count and timing controller for the message correlation id, to ensure they are cleaned up. + perCorrelationIds.remove(messageCorrelationId); + } + + /** Performs test fixture creation on a per thread basis. This will only be called once for each test thread. */ + public void threadSetUp() + { + _logger.debug("public void threadSetUp(): called"); + + try + { + // Call the set up method in the super class. This creates a PingClient pinger. + super.threadSetUp(); + + // Create the chained message listener, only if it has not already been created. This is set up with the + // batch size property, to tell it what batch size to output results on. A synchronized block is used to + // ensure that only one thread creates this. + synchronized (this) + { + if (batchedResultsListener == null) + { + int batchSize = Integer.parseInt(testParameters.getProperty(TEST_RESULTS_BATCH_SIZE_PROPNAME)); + batchedResultsListener = new BatchedResultsListener(batchSize); + } + } + + // Get the set up that the super class created. + PerThreadSetup perThreadSetup = threadSetup.get(); + + // Register the chained message listener on the pinger to do its asynchronous test timings from. + perThreadSetup._pingClient.setChainedMessageListener(batchedResultsListener); + } + catch (Exception e) + { + _logger.warn("There was an exception during per thread setup.", e); + } + } + + /** + * BatchedResultsListener is a {@link org.apache.qpid.requestreply.PingPongProducer.ChainedMessageListener} that can + * be attached to the pinger, in order to receive notifications about every message received and the number + * remaining to be received. Whenever the number remaining crosses a batch size boundary this results listener + * outputs a test timing for the actual number of messages received in the current batch. + */ + private class BatchedResultsListener implements PingPongProducer.ChainedMessageListener + { + /** The test results logging batch size. */ + int _batchSize; + private boolean _strictAMQP; + + /** + * Creates a results listener on the specified batch size. + * + * @param batchSize The batch size to use. + */ + public BatchedResultsListener(int batchSize) + { + _batchSize = batchSize; + _strictAMQP = + Boolean.parseBoolean(System.getProperties().getProperty(AMQSession.STRICT_AMQP, + AMQSession.STRICT_AMQP_DEFAULT)); + } + + /** + * This callback method is called from all of the pingers that this test creates. It uses the correlation id + * from the message to identify the timing controller for the test thread that was responsible for sending those + * messages. + * + * @param message The message. + * @param remainingCount The count of messages remaining to be received with a particular correlation id. + * + * @throws javax.jms.JMSException Any underlying JMSException is allowed to fall through. + */ + public void onMessage(Message message, int remainingCount, long latency) throws JMSException + { + _logger.debug("public void onMessage(Message message, int remainingCount = " + remainingCount + "): called"); + + // Check if a batch boundary has been crossed. + if ((remainingCount % _batchSize) == 0) + { + // Extract the correlation id from the message. + String correlationId = message.getJMSCorrelationID(); + + // Get the details for the correlation id and check that they are not null. They can become null + // if a test times out. + PerCorrelationId perCorrelationId = perCorrelationIds.get(correlationId); + if (perCorrelationId != null) + { + // Get the timing controller and expected count for this correlation id. + TimingController tc = perCorrelationId._tc; + int expected = perCorrelationId._expectedCount; + + // Calculate how many messages were actually received in the last batch. This will be the batch size + // except where the number expected is not a multiple of the batch size and this is the first remaining + // count to cross a batch size boundary, in which case it will be the number expected modulo the batch + // size. + int receivedInBatch = ((expected - remainingCount) < _batchSize) ? (expected % _batchSize) : _batchSize; + + // Register a test result for the correlation id. + try + { + tc.completeTest(true, receivedInBatch, latency); + } + catch (InterruptedException e) + { + // Ignore this. It means the test runner wants to stop as soon as possible. + _logger.warn("Got InterruptedException.", e); + } + } + // Else ignore, test timed out. Should log a fail here? + } + } + } + + /** + * Holds state specific to each correlation id, needed to output test results. This consists of the count of the + * total expected number of messages, and the timing controller for the thread sending those message ids. + */ + private static class PerCorrelationId + { + public int _expectedCount; + public TimingController _tc; + } +} diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingSendOnlyClient.java b/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingSendOnlyClient.java index 2879f0c322..04e723069a 100644 --- a/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingSendOnlyClient.java +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingSendOnlyClient.java @@ -1,93 +1,93 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.qpid.ping; - -import java.util.Properties; - -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.ObjectMessage; - -import org.apache.log4j.Logger; - -import org.apache.qpid.client.message.TestMessageFactory; -import org.apache.qpid.util.CommandLineParser; - -/** - *

- *
CRC Card
Responsibilities Collaborations - *
- */ -public class PingSendOnlyClient extends PingDurableClient -{ - private static final Logger log = Logger.getLogger(PingSendOnlyClient.class); - - public PingSendOnlyClient(Properties overrides) throws Exception - { - super(overrides); - } - - /** - * Starts the ping/wait/receive process. - * - * @param args The command line arguments. - */ - public static void main(String[] args) - { - try - { - // Create a ping producer overriding its defaults with all options passed on the command line. - Properties options = CommandLineParser.processCommandLine(args, new CommandLineParser(new String[][] {}), System.getProperties()); - PingSendOnlyClient pingProducer = new PingSendOnlyClient(options); - - // Create a shutdown hook to terminate the ping-pong producer. - Runtime.getRuntime().addShutdownHook(pingProducer.getShutdownHook()); - - // Ensure that the ping pong producer is registered to listen for exceptions on the connection too. - // pingProducer.getConnection().setExceptionListener(pingProducer); - - // Run the test procedure. - int sent = pingProducer.send(); - pingProducer.waitForUser("Press return to close connection and quit."); - pingProducer.closeConnection(); - - System.exit(0); - } - catch (Exception e) - { - System.err.println(e.getMessage()); - log.error("Top level handler caught execption.", e); - System.exit(1); - } - } - - public Message getTestMessage(Destination replyQueue, int messageSize, boolean persistent) throws JMSException - { - Message msg = TestMessageFactory.newTextMessage(_producerSession, messageSize); - - // Timestamp the message in nanoseconds. - msg.setLongProperty(MESSAGE_TIMESTAMP_PROPNAME, System.nanoTime()); - - return msg; - } -} +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.ping; + +import java.util.Properties; + +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.ObjectMessage; + +import org.apache.log4j.Logger; + +import org.apache.qpid.client.message.TestMessageFactory; +import org.apache.qpid.util.CommandLineParser; + +/** + *

+ *
CRC Card
Responsibilities Collaborations + *
+ */ +public class PingSendOnlyClient extends PingDurableClient +{ + private static final Logger log = Logger.getLogger(PingSendOnlyClient.class); + + public PingSendOnlyClient(Properties overrides) throws Exception + { + super(overrides); + } + + /** + * Starts the ping/wait/receive process. + * + * @param args The command line arguments. + */ + public static void main(String[] args) + { + try + { + // Create a ping producer overriding its defaults with all options passed on the command line. + Properties options = CommandLineParser.processCommandLine(args, new CommandLineParser(new String[][] {}), System.getProperties()); + PingSendOnlyClient pingProducer = new PingSendOnlyClient(options); + + // Create a shutdown hook to terminate the ping-pong producer. + Runtime.getRuntime().addShutdownHook(pingProducer.getShutdownHook()); + + // Ensure that the ping pong producer is registered to listen for exceptions on the connection too. + // pingProducer.getConnection().setExceptionListener(pingProducer); + + // Run the test procedure. + int sent = pingProducer.send(); + pingProducer.waitForUser("Press return to close connection and quit."); + pingProducer.closeConnection(); + + System.exit(0); + } + catch (Exception e) + { + System.err.println(e.getMessage()); + log.error("Top level handler caught execption.", e); + System.exit(1); + } + } + + public Message getTestMessage(Destination replyQueue, int messageSize, boolean persistent) throws JMSException + { + Message msg = TestMessageFactory.newTextMessage(_producerSession, messageSize); + + // Timestamp the message in nanoseconds. + msg.setLongProperty(MESSAGE_TIMESTAMP_PROPNAME, System.nanoTime()); + + return msg; + } +} diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingTestPerf.java b/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingTestPerf.java index fb071a87fe..94b8ea662e 100644 --- a/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingTestPerf.java +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingTestPerf.java @@ -1,196 +1,196 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.ping; - -import junit.framework.Assert; -import junit.framework.Test; -import junit.framework.TestSuite; - -import org.apache.log4j.Logger; - -import org.apache.qpid.requestreply.PingPongProducer; - -import org.apache.qpid.junit.extensions.AsymptoticTestCase; -import org.apache.qpid.junit.extensions.TestThreadAware; -import org.apache.qpid.junit.extensions.util.ParsedProperties; -import org.apache.qpid.junit.extensions.util.TestContextProperties; - -import javax.jms.*; - -/** - * PingTestPerf is a ping test, that has been written with the intention of being scaled up to run many times - * simultaneously to simluate many clients/producers/connections. - * - *

A single run of the test using the default JUnit test runner will result in the sending and timing of a single - * full round trip ping. This test may be scaled up using a suitable JUnit test runner. - * - *

The setup/teardown cycle establishes a connection to a broker and sets up a queue to send ping messages to and a - * temporary queue for replies. This setup is only established once for all the test repeats/threads that may be run, - * except if the connection is lost in which case an attempt to re-establish the setup is made. - * - *

The test cycle is: Connects to a queue, creates a temporary queue, creates messages containing a property that - * is the name of the temporary queue, fires off a message on the original queue and waits for a response on the - * temporary queue. - * - *

Configurable test properties: message size, transacted or not, persistent or not. Broker connection details. - * - *

- *
CRC Card
Responsibilities Collaborations - *
- */ -public class PingTestPerf extends AsymptoticTestCase implements TestThreadAware -{ - private static Logger _logger = Logger.getLogger(PingTestPerf.class); - - /** Thread local to hold the per-thread test setup fields. */ - ThreadLocal threadSetup = new ThreadLocal(); - - /** Holds a property reader to extract the test parameters from. */ - protected ParsedProperties testParameters = - TestContextProperties.getInstance(PingPongProducer.defaults /*System.getProperties()*/); - - public PingTestPerf(String name) - { - super(name); - - _logger.debug("testParameters = " + testParameters); - } - - /** - * Compile all the tests into a test suite. - * @return The test method testPingOk. - */ - public static Test suite() - { - // Build a new test suite - TestSuite suite = new TestSuite("Ping Performance Tests"); - - // Run performance tests in read committed mode. - suite.addTest(new PingTestPerf("testPingOk")); - - return suite; - } - - public void testPingOk(int numPings) throws Exception - { - if (numPings == 0) - { - Assert.fail("Number of pings requested was zero."); - } - - // Get the per thread test setup to run the test through. - PerThreadSetup perThreadSetup = threadSetup.get(); - - if (perThreadSetup == null) - { - Assert.fail("Could not get per thread test setup, it was null."); - } - - // Generate a sample message. This message is already time stamped and has its reply-to destination set. - Message msg = - perThreadSetup._pingClient.getTestMessage(perThreadSetup._pingClient.getReplyDestinations().get(0), - testParameters.getPropertyAsInteger(PingPongProducer.MESSAGE_SIZE_PROPNAME), - testParameters.getPropertyAsBoolean(PingPongProducer.PERSISTENT_MODE_PROPNAME)); - - // start the test - long timeout = Long.parseLong(testParameters.getProperty(PingPongProducer.TIMEOUT_PROPNAME)); - int numReplies = perThreadSetup._pingClient.pingAndWaitForReply(msg, numPings, timeout, null); - - // Fail the test if the timeout was exceeded. - if (numReplies != perThreadSetup._pingClient.getExpectedNumPings(numPings)) - { - Assert.fail("The ping timed out after " + timeout + " ms. Messages Sent = " + numPings + ", MessagesReceived = " - + numReplies); - } - } - - /** - * Performs test fixture creation on a per thread basis. This will only be called once for each test thread. - */ - public void threadSetUp() - { - _logger.debug("public void threadSetUp(): called"); - - try - { - PerThreadSetup perThreadSetup = new PerThreadSetup(); - - // This is synchronized because there is a race condition, which causes one connection to sleep if - // all threads try to create connection concurrently. - synchronized (this) - { - // Establish a client to ping a Destination and listen the reply back from same Destination - perThreadSetup._pingClient = new PingClient(testParameters); - perThreadSetup._pingClient.establishConnection(true, true); - } - // Start the client connection - perThreadSetup._pingClient.start(); - - // Attach the per-thread set to the thread. - threadSetup.set(perThreadSetup); - } - catch (Exception e) - { - _logger.warn("There was an exception during per thread setup.", e); - } - } - - /** - * Performs test fixture clean - */ - public void threadTearDown() - { - _logger.debug("public void threadTearDown(): called"); - - try - { - // Get the per thread test fixture. - PerThreadSetup perThreadSetup = threadSetup.get(); - - // Close the pingers so that it cleans up its connection cleanly. - synchronized (this) - { - if ((perThreadSetup != null) && (perThreadSetup._pingClient != null)) - { - perThreadSetup._pingClient.close(); - } - } - } - catch (JMSException e) - { - _logger.warn("There was an exception during per thread tear down."); - } - finally - { - // Ensure the per thread fixture is reclaimed. - threadSetup.remove(); - } - } - - protected static class PerThreadSetup - { - /** - * Holds the test ping client. - */ - protected PingClient _pingClient; - protected String _correlationId; - } -} +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.ping; + +import junit.framework.Assert; +import junit.framework.Test; +import junit.framework.TestSuite; + +import org.apache.log4j.Logger; + +import org.apache.qpid.requestreply.PingPongProducer; + +import org.apache.qpid.junit.extensions.AsymptoticTestCase; +import org.apache.qpid.junit.extensions.TestThreadAware; +import org.apache.qpid.junit.extensions.util.ParsedProperties; +import org.apache.qpid.junit.extensions.util.TestContextProperties; + +import javax.jms.*; + +/** + * PingTestPerf is a ping test, that has been written with the intention of being scaled up to run many times + * simultaneously to simluate many clients/producers/connections. + * + *

A single run of the test using the default JUnit test runner will result in the sending and timing of a single + * full round trip ping. This test may be scaled up using a suitable JUnit test runner. + * + *

The setup/teardown cycle establishes a connection to a broker and sets up a queue to send ping messages to and a + * temporary queue for replies. This setup is only established once for all the test repeats/threads that may be run, + * except if the connection is lost in which case an attempt to re-establish the setup is made. + * + *

The test cycle is: Connects to a queue, creates a temporary queue, creates messages containing a property that + * is the name of the temporary queue, fires off a message on the original queue and waits for a response on the + * temporary queue. + * + *

Configurable test properties: message size, transacted or not, persistent or not. Broker connection details. + * + *

+ *
CRC Card
Responsibilities Collaborations + *
+ */ +public class PingTestPerf extends AsymptoticTestCase implements TestThreadAware +{ + private static Logger _logger = Logger.getLogger(PingTestPerf.class); + + /** Thread local to hold the per-thread test setup fields. */ + ThreadLocal threadSetup = new ThreadLocal(); + + /** Holds a property reader to extract the test parameters from. */ + protected ParsedProperties testParameters = + TestContextProperties.getInstance(PingPongProducer.defaults /*System.getProperties()*/); + + public PingTestPerf(String name) + { + super(name); + + _logger.debug("testParameters = " + testParameters); + } + + /** + * Compile all the tests into a test suite. + * @return The test method testPingOk. + */ + public static Test suite() + { + // Build a new test suite + TestSuite suite = new TestSuite("Ping Performance Tests"); + + // Run performance tests in read committed mode. + suite.addTest(new PingTestPerf("testPingOk")); + + return suite; + } + + public void testPingOk(int numPings) throws Exception + { + if (numPings == 0) + { + Assert.fail("Number of pings requested was zero."); + } + + // Get the per thread test setup to run the test through. + PerThreadSetup perThreadSetup = threadSetup.get(); + + if (perThreadSetup == null) + { + Assert.fail("Could not get per thread test setup, it was null."); + } + + // Generate a sample message. This message is already time stamped and has its reply-to destination set. + Message msg = + perThreadSetup._pingClient.getTestMessage(perThreadSetup._pingClient.getReplyDestinations().get(0), + testParameters.getPropertyAsInteger(PingPongProducer.MESSAGE_SIZE_PROPNAME), + testParameters.getPropertyAsBoolean(PingPongProducer.PERSISTENT_MODE_PROPNAME)); + + // start the test + long timeout = Long.parseLong(testParameters.getProperty(PingPongProducer.TIMEOUT_PROPNAME)); + int numReplies = perThreadSetup._pingClient.pingAndWaitForReply(msg, numPings, timeout, null); + + // Fail the test if the timeout was exceeded. + if (numReplies != perThreadSetup._pingClient.getExpectedNumPings(numPings)) + { + Assert.fail("The ping timed out after " + timeout + " ms. Messages Sent = " + numPings + ", MessagesReceived = " + + numReplies); + } + } + + /** + * Performs test fixture creation on a per thread basis. This will only be called once for each test thread. + */ + public void threadSetUp() + { + _logger.debug("public void threadSetUp(): called"); + + try + { + PerThreadSetup perThreadSetup = new PerThreadSetup(); + + // This is synchronized because there is a race condition, which causes one connection to sleep if + // all threads try to create connection concurrently. + synchronized (this) + { + // Establish a client to ping a Destination and listen the reply back from same Destination + perThreadSetup._pingClient = new PingClient(testParameters); + perThreadSetup._pingClient.establishConnection(true, true); + } + // Start the client connection + perThreadSetup._pingClient.start(); + + // Attach the per-thread set to the thread. + threadSetup.set(perThreadSetup); + } + catch (Exception e) + { + _logger.warn("There was an exception during per thread setup.", e); + } + } + + /** + * Performs test fixture clean + */ + public void threadTearDown() + { + _logger.debug("public void threadTearDown(): called"); + + try + { + // Get the per thread test fixture. + PerThreadSetup perThreadSetup = threadSetup.get(); + + // Close the pingers so that it cleans up its connection cleanly. + synchronized (this) + { + if ((perThreadSetup != null) && (perThreadSetup._pingClient != null)) + { + perThreadSetup._pingClient.close(); + } + } + } + catch (JMSException e) + { + _logger.warn("There was an exception during per thread tear down."); + } + finally + { + // Ensure the per thread fixture is reclaimed. + threadSetup.remove(); + } + } + + protected static class PerThreadSetup + { + /** + * Holds the test ping client. + */ + protected PingClient _pingClient; + protected String _correlationId; + } +} diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongBouncer.java b/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongBouncer.java index 0712557383..8e010ccf07 100644 --- a/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongBouncer.java +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongBouncer.java @@ -1,453 +1,453 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.requestreply; - -import java.io.IOException; -import java.net.InetAddress; -import java.text.SimpleDateFormat; -import java.util.Date; - -import javax.jms.*; - -import org.apache.log4j.Logger; - -import org.apache.qpid.client.AMQConnection; -import org.apache.qpid.client.AMQQueue; -import org.apache.qpid.client.AMQTopic; -import org.apache.qpid.jms.ConnectionListener; -import org.apache.qpid.jms.Session; -import org.apache.qpid.topic.Config; -import org.apache.qpid.exchange.ExchangeDefaults; - -/** - * PingPongBouncer is a message listener the bounces back messages to their reply to destination. This is used to return - * ping messages generated by {@link org.apache.qpid.requestreply.PingPongProducer} but could be used for other purposes - * too. - * - *

The correlation id from the received message is extracted, and placed into the reply as the correlation id. Messages - * are bounced back to their reply-to destination. The original sender of the message has the option to use either a unique - * temporary queue or the correlation id to correlate the original message to the reply. - * - *

There is a verbose mode flag which causes information about each ping to be output to the console - * (info level logging, so usually console). This can be helpfull to check the bounce backs are happening but should - * be disabled for real timing tests as writing to the console will slow things down. - * - *

- *
CRC Card
Responsibilities Collaborations - *
Bounce back messages to their reply to destination. - *
Provide command line invocation to start the bounce back on a configurable broker url. - *
- * - * @todo Replace the command line parsing with a neater tool. - * - * @todo Make verbose accept a number of messages, only prints to console every X messages. - */ -public class PingPongBouncer implements MessageListener -{ - private static final Logger _logger = Logger.getLogger(PingPongBouncer.class); - - /** The default prefetch size for the message consumer. */ - private static final int PREFETCH = 1; - - /** The default no local flag for the message consumer. */ - private static final boolean NO_LOCAL = true; - - private static final String DEFAULT_DESTINATION_NAME = "ping"; - - /** The default exclusive flag for the message consumer. */ - private static final boolean EXCLUSIVE = false; - - /** A convenient formatter to use when time stamping output. */ - protected static final SimpleDateFormat timestampFormatter = new SimpleDateFormat("hh:mm:ss:SS"); - - /** Used to indicate that the reply generator should log timing info to the console (logger info level). */ - private boolean _verbose = false; - - /** Determines whether this bounce back client bounces back messages persistently. */ - private boolean _persistent = false; - - private Destination _consumerDestination; - - /** Keeps track of the response destination of the previous message for the last reply to producer cache. */ - private Destination _lastResponseDest; - - /** The producer for sending replies with. */ - private MessageProducer _replyProducer; - - /** The consumer controlSession. */ - private Session _consumerSession; - - /** The producer controlSession. */ - private Session _producerSession; - - /** Holds the connection to the broker. */ - private AMQConnection _connection; - - /** Flag used to indicate if this is a point to point or pub/sub ping client. */ - private boolean _isPubSub = false; - - /** - * This flag is used to indicate that the user should be prompted to kill a broker, in order to test - * failover, immediately before committing a transaction. - */ - protected boolean _failBeforeCommit = false; - - /** - * This flag is used to indicate that the user should be prompted to a kill a broker, in order to test - * failover, immediate after committing a transaction. - */ - protected boolean _failAfterCommit = false; - - /** - * Creates a PingPongBouncer on the specified producer and consumer sessions. - * - * @param brokerDetails The addresses of the brokers to connect to. - * @param username The broker username. - * @param password The broker password. - * @param virtualpath The virtual host name within the broker. - * @param destinationName The name of the queue to receive pings on - * (or root of the queue name where many queues are generated). - * @param persistent A flag to indicate that persistent message should be used. - * @param transacted A flag to indicate that pings should be sent within transactions. - * @param selector A message selector to filter received pings with. - * @param verbose A flag to indicate that message timings should be sent to the console. - * - * @throws Exception All underlying exceptions allowed to fall through. This is only test code... - */ - public PingPongBouncer(String brokerDetails, String username, String password, String virtualpath, - String destinationName, boolean persistent, boolean transacted, String selector, boolean verbose, - boolean pubsub) throws Exception - { - // Create a client id to uniquely identify this client. - InetAddress address = InetAddress.getLocalHost(); - String clientId = address.getHostName() + System.currentTimeMillis(); - _verbose = verbose; - _persistent = persistent; - setPubSub(pubsub); - // Connect to the broker. - setConnection(new AMQConnection(brokerDetails, username, password, clientId, virtualpath)); - _logger.info("Connected with URL:" + getConnection().toURL()); - - // Set up the failover notifier. - getConnection().setConnectionListener(new FailoverNotifier()); - - // Create a controlSession to listen for messages on and one to send replies on, transactional depending on the - // command line option. - _consumerSession = (Session) getConnection().createSession(transacted, Session.AUTO_ACKNOWLEDGE); - _producerSession = (Session) getConnection().createSession(transacted, Session.AUTO_ACKNOWLEDGE); - - // Create the queue to listen for message on. - createConsumerDestination(destinationName); - MessageConsumer consumer = - _consumerSession.createConsumer(_consumerDestination, PREFETCH, NO_LOCAL, EXCLUSIVE, selector); - - // Create a producer for the replies, without a default destination. - _replyProducer = _producerSession.createProducer(null); - _replyProducer.setDisableMessageTimestamp(true); - _replyProducer.setDeliveryMode(_persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT); - - // Set this up to listen for messages on the queue. - consumer.setMessageListener(this); - } - - /** - * Starts a stand alone ping-pong client running in verbose mode. - * - * @param args - */ - public static void main(String[] args) throws Exception - { - System.out.println("Starting..."); - - // Display help on the command line. - if (args.length == 0) - { - _logger.info("Running test with default values..."); - //usage(); - //System.exit(0); - } - - // Extract all command line parameters. - Config config = new Config(); - config.setOptions(args); - String brokerDetails = config.getHost() + ":" + config.getPort(); - String virtualpath = "test"; - String destinationName = config.getDestination(); - if (destinationName == null) - { - destinationName = DEFAULT_DESTINATION_NAME; - } - - String selector = config.getSelector(); - boolean transacted = config.isTransacted(); - boolean persistent = config.usePersistentMessages(); - boolean pubsub = config.isPubSub(); - boolean verbose = true; - - //String selector = null; - - // Instantiate the ping pong client with the command line options and start it running. - PingPongBouncer pingBouncer = - new PingPongBouncer(brokerDetails, "guest", "guest", virtualpath, destinationName, persistent, transacted, - selector, verbose, pubsub); - pingBouncer.getConnection().start(); - - System.out.println("Waiting..."); - } - - private static void usage() - { - System.err.println("Usage: PingPongBouncer \n" + "-host : broker host\n" + "-port : broker port\n" - + "-destinationname : queue/topic name\n" + "-transacted : (true/false). Default is false\n" - + "-persistent : (true/false). Default is false\n" - + "-pubsub : (true/false). Default is false\n" + "-selector : selector string\n"); - } - - /** - * This is a callback method that is notified of all messages for which this has been registered as a message - * listener on a message consumer. It sends a reply (pong) to all messages it receieves on the reply to - * destination of the message. - * - * @param message The message that triggered this callback. - */ - public void onMessage(Message message) - { - try - { - String messageCorrelationId = message.getJMSCorrelationID(); - if (_verbose) - { - _logger.info(timestampFormatter.format(new Date()) + ": Got ping with correlation id, " - + messageCorrelationId); - } - - // Get the reply to destination from the message and check it is set. - Destination responseDest = message.getJMSReplyTo(); - - if (responseDest == null) - { - _logger.debug("Cannot send reply because reply-to destination is null."); - - return; - } - - // Spew out some timing information if verbose mode is on. - if (_verbose) - { - Long timestamp = message.getLongProperty("timestamp"); - - if (timestamp != null) - { - long diff = System.currentTimeMillis() - timestamp; - _logger.info("Time to bounce point: " + diff); - } - } - - // Correlate the reply to the original. - message.setJMSCorrelationID(messageCorrelationId); - - // Send the receieved message as the pong reply. - _replyProducer.send(responseDest, message); - - if (_verbose) - { - _logger.info(timestampFormatter.format(new Date()) + ": Sent reply with correlation id, " - + messageCorrelationId); - } - - // Commit the transaction if running in transactional mode. - commitTx(_producerSession); - } - catch (JMSException e) - { - _logger.debug("There was a JMSException: " + e.getMessage(), e); - } - } - - /** - * Gets the underlying connection that this ping client is running on. - * - * @return The underlying connection that this ping client is running on. - */ - public AMQConnection getConnection() - { - return _connection; - } - - /** - * Sets the connection that this ping client is using. - * - * @param connection The ping connection. - */ - public void setConnection(AMQConnection connection) - { - this._connection = connection; - } - - /** - * Sets or clears the pub/sub flag to indiciate whether this client is pinging a queue or a topic. - * - * @param pubsub true if this client is pinging a topic, false if it is pinging a queue. - */ - public void setPubSub(boolean pubsub) - { - _isPubSub = pubsub; - } - - /** - * Checks whether this client is a p2p or pub/sub ping client. - * - * @return true if this client is pinging a topic, false if it is pinging a queue. - */ - public boolean isPubSub() - { - return _isPubSub; - } - - /** - * Convenience method to commit the transaction on the specified controlSession. If the controlSession to commit on is not - * a transactional controlSession, this method does nothing. - * - *

If the {@link #_failBeforeCommit} flag is set, this will prompt the user to kill the broker before the - * commit is applied. If the {@link #_failAfterCommit} flag is set, this will prompt the user to kill the broker - * after the commit is applied. - * - * @throws javax.jms.JMSException If the commit fails and then the rollback fails. - */ - protected void commitTx(Session session) throws JMSException - { - if (session.getTransacted()) - { - try - { - if (_failBeforeCommit) - { - _logger.debug("Failing Before Commit"); - doFailover(); - } - - session.commit(); - - if (_failAfterCommit) - { - _logger.debug("Failing After Commit"); - doFailover(); - } - - _logger.debug("Session Commited."); - } - catch (JMSException e) - { - _logger.trace("JMSException on commit:" + e.getMessage(), e); - - try - { - session.rollback(); - _logger.debug("Message rolled back."); - } - catch (JMSException jmse) - { - _logger.trace("JMSE on rollback:" + jmse.getMessage(), jmse); - - // Both commit and rollback failed. Throw the rollback exception. - throw jmse; - } - } - } - } - - /** - * Prompts the user to terminate the named broker, in order to test failover functionality. This method will block - * until the user supplied some input on the terminal. - * - * @param broker The name of the broker to terminate. - */ - protected void doFailover(String broker) - { - System.out.println("Kill Broker " + broker + " now."); - try - { - System.in.read(); - } - catch (IOException e) - { } - - System.out.println("Continuing."); - } - - /** - * Prompts the user to terminate the broker, in order to test failover functionality. This method will block - * until the user supplied some input on the terminal. - */ - protected void doFailover() - { - System.out.println("Kill Broker now."); - try - { - System.in.read(); - } - catch (IOException e) - { } - - System.out.println("Continuing."); - - } - - private void createConsumerDestination(String name) - { - if (isPubSub()) - { - _consumerDestination = new AMQTopic(ExchangeDefaults.TOPIC_EXCHANGE_NAME, name); - } - else - { - _consumerDestination = new AMQQueue(ExchangeDefaults.DIRECT_EXCHANGE_NAME, name); - } - } - - /** - * A connection listener that logs out any failover complete events. Could do more interesting things with this - * at some point... - */ - public static class FailoverNotifier implements ConnectionListener - { - public void bytesSent(long count) - { } - - public void bytesReceived(long count) - { } - - public boolean preFailover(boolean redirect) - { - return true; - } - - public boolean preResubscribe() - { - return true; - } - - public void failoverComplete() - { - _logger.info("App got failover complete callback."); - } - } -} +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.requestreply; + +import java.io.IOException; +import java.net.InetAddress; +import java.text.SimpleDateFormat; +import java.util.Date; + +import javax.jms.*; + +import org.apache.log4j.Logger; + +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQQueue; +import org.apache.qpid.client.AMQTopic; +import org.apache.qpid.jms.ConnectionListener; +import org.apache.qpid.jms.Session; +import org.apache.qpid.topic.Config; +import org.apache.qpid.exchange.ExchangeDefaults; + +/** + * PingPongBouncer is a message listener the bounces back messages to their reply to destination. This is used to return + * ping messages generated by {@link org.apache.qpid.requestreply.PingPongProducer} but could be used for other purposes + * too. + * + *

The correlation id from the received message is extracted, and placed into the reply as the correlation id. Messages + * are bounced back to their reply-to destination. The original sender of the message has the option to use either a unique + * temporary queue or the correlation id to correlate the original message to the reply. + * + *

There is a verbose mode flag which causes information about each ping to be output to the console + * (info level logging, so usually console). This can be helpfull to check the bounce backs are happening but should + * be disabled for real timing tests as writing to the console will slow things down. + * + *

+ *
CRC Card
Responsibilities Collaborations + *
Bounce back messages to their reply to destination. + *
Provide command line invocation to start the bounce back on a configurable broker url. + *
+ * + * @todo Replace the command line parsing with a neater tool. + * + * @todo Make verbose accept a number of messages, only prints to console every X messages. + */ +public class PingPongBouncer implements MessageListener +{ + private static final Logger _logger = Logger.getLogger(PingPongBouncer.class); + + /** The default prefetch size for the message consumer. */ + private static final int PREFETCH = 1; + + /** The default no local flag for the message consumer. */ + private static final boolean NO_LOCAL = true; + + private static final String DEFAULT_DESTINATION_NAME = "ping"; + + /** The default exclusive flag for the message consumer. */ + private static final boolean EXCLUSIVE = false; + + /** A convenient formatter to use when time stamping output. */ + protected static final SimpleDateFormat timestampFormatter = new SimpleDateFormat("hh:mm:ss:SS"); + + /** Used to indicate that the reply generator should log timing info to the console (logger info level). */ + private boolean _verbose = false; + + /** Determines whether this bounce back client bounces back messages persistently. */ + private boolean _persistent = false; + + private Destination _consumerDestination; + + /** Keeps track of the response destination of the previous message for the last reply to producer cache. */ + private Destination _lastResponseDest; + + /** The producer for sending replies with. */ + private MessageProducer _replyProducer; + + /** The consumer controlSession. */ + private Session _consumerSession; + + /** The producer controlSession. */ + private Session _producerSession; + + /** Holds the connection to the broker. */ + private AMQConnection _connection; + + /** Flag used to indicate if this is a point to point or pub/sub ping client. */ + private boolean _isPubSub = false; + + /** + * This flag is used to indicate that the user should be prompted to kill a broker, in order to test + * failover, immediately before committing a transaction. + */ + protected boolean _failBeforeCommit = false; + + /** + * This flag is used to indicate that the user should be prompted to a kill a broker, in order to test + * failover, immediate after committing a transaction. + */ + protected boolean _failAfterCommit = false; + + /** + * Creates a PingPongBouncer on the specified producer and consumer sessions. + * + * @param brokerDetails The addresses of the brokers to connect to. + * @param username The broker username. + * @param password The broker password. + * @param virtualpath The virtual host name within the broker. + * @param destinationName The name of the queue to receive pings on + * (or root of the queue name where many queues are generated). + * @param persistent A flag to indicate that persistent message should be used. + * @param transacted A flag to indicate that pings should be sent within transactions. + * @param selector A message selector to filter received pings with. + * @param verbose A flag to indicate that message timings should be sent to the console. + * + * @throws Exception All underlying exceptions allowed to fall through. This is only test code... + */ + public PingPongBouncer(String brokerDetails, String username, String password, String virtualpath, + String destinationName, boolean persistent, boolean transacted, String selector, boolean verbose, + boolean pubsub) throws Exception + { + // Create a client id to uniquely identify this client. + InetAddress address = InetAddress.getLocalHost(); + String clientId = address.getHostName() + System.currentTimeMillis(); + _verbose = verbose; + _persistent = persistent; + setPubSub(pubsub); + // Connect to the broker. + setConnection(new AMQConnection(brokerDetails, username, password, clientId, virtualpath)); + _logger.info("Connected with URL:" + getConnection().toURL()); + + // Set up the failover notifier. + getConnection().setConnectionListener(new FailoverNotifier()); + + // Create a controlSession to listen for messages on and one to send replies on, transactional depending on the + // command line option. + _consumerSession = (Session) getConnection().createSession(transacted, Session.AUTO_ACKNOWLEDGE); + _producerSession = (Session) getConnection().createSession(transacted, Session.AUTO_ACKNOWLEDGE); + + // Create the queue to listen for message on. + createConsumerDestination(destinationName); + MessageConsumer consumer = + _consumerSession.createConsumer(_consumerDestination, PREFETCH, NO_LOCAL, EXCLUSIVE, selector); + + // Create a producer for the replies, without a default destination. + _replyProducer = _producerSession.createProducer(null); + _replyProducer.setDisableMessageTimestamp(true); + _replyProducer.setDeliveryMode(_persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT); + + // Set this up to listen for messages on the queue. + consumer.setMessageListener(this); + } + + /** + * Starts a stand alone ping-pong client running in verbose mode. + * + * @param args + */ + public static void main(String[] args) throws Exception + { + System.out.println("Starting..."); + + // Display help on the command line. + if (args.length == 0) + { + _logger.info("Running test with default values..."); + //usage(); + //System.exit(0); + } + + // Extract all command line parameters. + Config config = new Config(); + config.setOptions(args); + String brokerDetails = config.getHost() + ":" + config.getPort(); + String virtualpath = "test"; + String destinationName = config.getDestination(); + if (destinationName == null) + { + destinationName = DEFAULT_DESTINATION_NAME; + } + + String selector = config.getSelector(); + boolean transacted = config.isTransacted(); + boolean persistent = config.usePersistentMessages(); + boolean pubsub = config.isPubSub(); + boolean verbose = true; + + //String selector = null; + + // Instantiate the ping pong client with the command line options and start it running. + PingPongBouncer pingBouncer = + new PingPongBouncer(brokerDetails, "guest", "guest", virtualpath, destinationName, persistent, transacted, + selector, verbose, pubsub); + pingBouncer.getConnection().start(); + + System.out.println("Waiting..."); + } + + private static void usage() + { + System.err.println("Usage: PingPongBouncer \n" + "-host : broker host\n" + "-port : broker port\n" + + "-destinationname : queue/topic name\n" + "-transacted : (true/false). Default is false\n" + + "-persistent : (true/false). Default is false\n" + + "-pubsub : (true/false). Default is false\n" + "-selector : selector string\n"); + } + + /** + * This is a callback method that is notified of all messages for which this has been registered as a message + * listener on a message consumer. It sends a reply (pong) to all messages it receieves on the reply to + * destination of the message. + * + * @param message The message that triggered this callback. + */ + public void onMessage(Message message) + { + try + { + String messageCorrelationId = message.getJMSCorrelationID(); + if (_verbose) + { + _logger.info(timestampFormatter.format(new Date()) + ": Got ping with correlation id, " + + messageCorrelationId); + } + + // Get the reply to destination from the message and check it is set. + Destination responseDest = message.getJMSReplyTo(); + + if (responseDest == null) + { + _logger.debug("Cannot send reply because reply-to destination is null."); + + return; + } + + // Spew out some timing information if verbose mode is on. + if (_verbose) + { + Long timestamp = message.getLongProperty("timestamp"); + + if (timestamp != null) + { + long diff = System.currentTimeMillis() - timestamp; + _logger.info("Time to bounce point: " + diff); + } + } + + // Correlate the reply to the original. + message.setJMSCorrelationID(messageCorrelationId); + + // Send the receieved message as the pong reply. + _replyProducer.send(responseDest, message); + + if (_verbose) + { + _logger.info(timestampFormatter.format(new Date()) + ": Sent reply with correlation id, " + + messageCorrelationId); + } + + // Commit the transaction if running in transactional mode. + commitTx(_producerSession); + } + catch (JMSException e) + { + _logger.debug("There was a JMSException: " + e.getMessage(), e); + } + } + + /** + * Gets the underlying connection that this ping client is running on. + * + * @return The underlying connection that this ping client is running on. + */ + public AMQConnection getConnection() + { + return _connection; + } + + /** + * Sets the connection that this ping client is using. + * + * @param connection The ping connection. + */ + public void setConnection(AMQConnection connection) + { + this._connection = connection; + } + + /** + * Sets or clears the pub/sub flag to indiciate whether this client is pinging a queue or a topic. + * + * @param pubsub true if this client is pinging a topic, false if it is pinging a queue. + */ + public void setPubSub(boolean pubsub) + { + _isPubSub = pubsub; + } + + /** + * Checks whether this client is a p2p or pub/sub ping client. + * + * @return true if this client is pinging a topic, false if it is pinging a queue. + */ + public boolean isPubSub() + { + return _isPubSub; + } + + /** + * Convenience method to commit the transaction on the specified controlSession. If the controlSession to commit on is not + * a transactional controlSession, this method does nothing. + * + *

If the {@link #_failBeforeCommit} flag is set, this will prompt the user to kill the broker before the + * commit is applied. If the {@link #_failAfterCommit} flag is set, this will prompt the user to kill the broker + * after the commit is applied. + * + * @throws javax.jms.JMSException If the commit fails and then the rollback fails. + */ + protected void commitTx(Session session) throws JMSException + { + if (session.getTransacted()) + { + try + { + if (_failBeforeCommit) + { + _logger.debug("Failing Before Commit"); + doFailover(); + } + + session.commit(); + + if (_failAfterCommit) + { + _logger.debug("Failing After Commit"); + doFailover(); + } + + _logger.debug("Session Commited."); + } + catch (JMSException e) + { + _logger.trace("JMSException on commit:" + e.getMessage(), e); + + try + { + session.rollback(); + _logger.debug("Message rolled back."); + } + catch (JMSException jmse) + { + _logger.trace("JMSE on rollback:" + jmse.getMessage(), jmse); + + // Both commit and rollback failed. Throw the rollback exception. + throw jmse; + } + } + } + } + + /** + * Prompts the user to terminate the named broker, in order to test failover functionality. This method will block + * until the user supplied some input on the terminal. + * + * @param broker The name of the broker to terminate. + */ + protected void doFailover(String broker) + { + System.out.println("Kill Broker " + broker + " now."); + try + { + System.in.read(); + } + catch (IOException e) + { } + + System.out.println("Continuing."); + } + + /** + * Prompts the user to terminate the broker, in order to test failover functionality. This method will block + * until the user supplied some input on the terminal. + */ + protected void doFailover() + { + System.out.println("Kill Broker now."); + try + { + System.in.read(); + } + catch (IOException e) + { } + + System.out.println("Continuing."); + + } + + private void createConsumerDestination(String name) + { + if (isPubSub()) + { + _consumerDestination = new AMQTopic(ExchangeDefaults.TOPIC_EXCHANGE_NAME, name); + } + else + { + _consumerDestination = new AMQQueue(ExchangeDefaults.DIRECT_EXCHANGE_NAME, name); + } + } + + /** + * A connection listener that logs out any failover complete events. Could do more interesting things with this + * at some point... + */ + public static class FailoverNotifier implements ConnectionListener + { + public void bytesSent(long count) + { } + + public void bytesReceived(long count) + { } + + public boolean preFailover(boolean redirect) + { + return true; + } + + public boolean preResubscribe() + { + return true; + } + + public void failoverComplete() + { + _logger.info("App got failover complete callback."); + } + } +} diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java b/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java index f328675488..4d8a736ec8 100644 --- a/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java @@ -1,1720 +1,1720 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.requestreply; - -import org.apache.log4j.Logger; -import org.apache.log4j.NDC; - -import org.apache.qpid.test.framework.TestUtils; - -import org.apache.qpid.junit.extensions.BatchedThrottle; -import org.apache.qpid.junit.extensions.Throttle; -import org.apache.qpid.junit.extensions.util.CommandLineParser; -import org.apache.qpid.junit.extensions.util.ParsedProperties; - -import javax.jms.*; -import javax.naming.Context; -import javax.naming.InitialContext; -import javax.naming.NamingException; - -import java.io.*; -import java.net.InetAddress; -import java.text.DateFormat; -import java.text.SimpleDateFormat; -import java.util.*; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.SynchronousQueue; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; - -/** - * PingPongProducer is a client that sends test messages, and waits for replies to these messages. The replies may - * either be generated by another client (see {@link PingPongBouncer}, or an extension of it may be used that listens - * to its own messages and does not send replies (see {@link org.apache.qpid.ping.PingClient}). The intention of ping - * pong producer is that it is a swiss-army knife test client that makes almost every aspect of its behaviour - * configurable. - * - *

The pings are sent with a reply-to field set to a single temporary queue, which is the same for all pings. This - * means that this class has to do some work to correlate pings with pongs; it expectes the original message correlation - * id in the ping to be bounced back in the reply correlation id. - * - *

This ping tool accepts a vast number of configuration options, all of which are passed in to the constructor. It - * can ping topics or queues; ping multiple destinations; do persistent pings; send messages of any size; do pings within - * transactions; control the number of pings to send in each transaction; limit its sending rate; and perform failover - * testing. A complete list of accepted parameters, default values and comments on their usage is provided here: - * - *

- *
Parameters
Parameter Default Comments - *
messageSize 0 Message size in bytes. Not including any headers. - *
destinationName ping The root name to use to generate destination names to ping. - *
persistent false Determines whether peristent delivery is used. - *
transacted false Determines whether messages are sent/received in transactions. - *
broker tcp://localhost:5672 Determines the broker to connect to. - *
virtualHost test Determines the virtual host to send all ping over. - *
rate 0 The maximum rate (in hertz) to send messages at. 0 means no limit. - *
verbose false The verbose flag for debugging. Prints to console on every message. - *
pubsub false Whether to ping topics or queues. Uses p2p by default. - *
failAfterCommit false Whether to prompt user to kill broker after a commit batch. - *
failBeforeCommit false Whether to prompt user to kill broker before a commit batch. - *
failAfterSend false Whether to prompt user to kill broker after a send. - *
failBeforeSend false Whether to prompt user to kill broker before a send. - *
failOnce true Whether to prompt for failover only once. - *
username guest The username to access the broker with. - *
password guest The password to access the broker with. - *
selector null Not used. Defines a message selector to filter pings with. - *
destinationCount 1 The number of destinations to send pings to. - *
numConsumers 1 The number of consumers on each destination. - *
timeout 30000 In milliseconds. The timeout to stop waiting for replies. - *
commitBatchSize 1 The number of messages per transaction in transactional mode. - *
uniqueDests true Whether each receivers only listens to one ping destination or all. - *
durableDests false Whether or not durable destinations are used. - *
ackMode AUTO_ACK The message acknowledgement mode. Possible values are: - * 0 - SESSION_TRANSACTED - * 1 - AUTO_ACKNOWLEDGE - * 2 - CLIENT_ACKNOWLEDGE - * 3 - DUPS_OK_ACKNOWLEDGE - * 257 - NO_ACKNOWLEDGE - * 258 - PRE_ACKNOWLEDGE - *
consTransacted false Whether or not consumers use transactions. Defaults to the same value - * as the 'transacted' option if not seperately defined. - *
consAckMode AUTO_ACK The message acknowledgement mode for consumers. Defaults to the same - * value as 'ackMode' if not seperately defined. - *
maxPending 0 The maximum size in bytes, of messages sent but not yet received. - * Limits the volume of messages currently buffered on the client - * or broker. Can help scale test clients by limiting amount of buffered - * data to avoid out of memory errors. - *
- * - *

This implements the Runnable interface with a run method that implements an infinite ping loop. The ping loop - * does all its work through helper methods, so that code wishing to run a ping-pong cycle is not forced to do so by - * starting a new thread. The command line invocation does take advantage of this ping loop. A shutdown hook is also - * registered to terminate the ping-pong loop cleanly. - * - *

- *
CRC Card
Responsibilities Collaborations - *
Provide a ping and wait for all responses cycle. - *
Provide command line invocation to loop the ping cycle on a configurable broker url. - *
- * - * @todo Use read/write lock in the onmessage, not for reading writing but to make use of a shared and exlcusive lock pair. - * Obtain read lock on all messages, before decrementing the message count. At the end of the on message method add a - * block that obtains the write lock for the very last message, releases any waiting producer. Means that the last - * message waits until all other messages have been handled before releasing producers but allows messages to be - * processed concurrently, unlike the current synchronized block. - */ -public class PingPongProducer implements Runnable, ExceptionListener -{ - /** Used for debugging. */ - private static final Logger log = Logger.getLogger(PingPongProducer.class); - - /** Holds the name of the property to determine whether of not client id is overridden at connection time. */ - public static final String OVERRIDE_CLIENT_ID_PROPNAME = "overrideClientId"; - - /** Holds the default value of the override client id flag. */ - public static final String OVERRIDE_CLIENT_ID_DEAFULT = "false"; - - /** Holds the name of the property to define the JNDI factory name with. */ - public static final String FACTORY_NAME_PROPNAME = "factoryName"; - - /** Holds the default JNDI name of the connection factory. */ - public static final String FACTORY_NAME_DEAFULT = "local"; - - /** Holds the name of the property to set the JNDI initial context properties with. */ - public static final String FILE_PROPERTIES_PROPNAME = "properties"; - - /** Holds the default file name of the JNDI initial context properties. */ - public static final String FILE_PROPERTIES_DEAFULT = "perftests.properties"; - - /** Holds the name of the property to get the test message size from. */ - public static final String MESSAGE_SIZE_PROPNAME = "messageSize"; - - /** Used to set up a default message size. */ - public static final int MESSAGE_SIZE_DEAFULT = 0; - - /** Holds the name of the property to get the ping queue name from. */ - public static final String PING_QUEUE_NAME_PROPNAME = "destinationName"; - - /** Holds the name of the default destination to send pings on. */ - public static final String PING_QUEUE_NAME_DEFAULT = "ping"; - - /** Holds the name of the property to get the queue name postfix from. */ - public static final String QUEUE_NAME_POSTFIX_PROPNAME = "queueNamePostfix"; - - /** Holds the default queue name postfix value. */ - public static final String QUEUE_NAME_POSTFIX_DEFAULT = ""; - - /** Holds the name of the property to get the test delivery mode from. */ - public static final String PERSISTENT_MODE_PROPNAME = "persistent"; - - /** Holds the message delivery mode to use for the test. */ - public static final boolean PERSISTENT_MODE_DEFAULT = false; - - /** Holds the name of the property to get the test transactional mode from. */ - public static final String TRANSACTED_PROPNAME = "transacted"; - - /** Holds the transactional mode to use for the test. */ - public static final boolean TRANSACTED_DEFAULT = false; - - /** Holds the name of the property to get the test consumer transacted mode from. */ - public static final String CONSUMER_TRANSACTED_PROPNAME = "consTransacted"; - - /** Holds the consumer transactional mode default setting. */ - public static final boolean CONSUMER_TRANSACTED_DEFAULT = false; - - /** Holds the name of the property to get the test broker url from. */ - public static final String BROKER_PROPNAME = "broker"; - - /** Holds the default broker url for the test. */ - public static final String BROKER_DEFAULT = "tcp://localhost:5672"; - - /** Holds the name of the property to get the test broker virtual path. */ - public static final String VIRTUAL_HOST_PROPNAME = "virtualHost"; - - /** Holds the default virtual path for the test. */ - public static final String VIRTUAL_HOST_DEFAULT = ""; - - /** Holds the name of the property to get the message rate from. */ - public static final String RATE_PROPNAME = "rate"; - - /** Defines the default rate (in pings per second) to send pings at. 0 means as fast as possible, no restriction. */ - public static final int RATE_DEFAULT = 0; - - /** Holds the name of the property to get the verbose mode proeprty from. */ - public static final String VERBOSE_PROPNAME = "verbose"; - - /** Holds the default verbose mode. */ - public static final boolean VERBOSE_DEFAULT = false; - - /** Holds the name of the property to get the p2p or pub/sub messaging mode from. */ - public static final String PUBSUB_PROPNAME = "pubsub"; - - /** Holds the pub/sub mode default, true means ping a topic, false means ping a queue. */ - public static final boolean PUBSUB_DEFAULT = false; - - /** Holds the name of the property to get the fail after commit flag from. */ - public static final String FAIL_AFTER_COMMIT_PROPNAME = "failAfterCommit"; - - /** Holds the default failover after commit test flag. */ - public static final boolean FAIL_AFTER_COMMIT_DEFAULT = false; - - /** Holds the name of the proeprty to get the fail before commit flag from. */ - public static final String FAIL_BEFORE_COMMIT_PROPNAME = "failBeforeCommit"; - - /** Holds the default failover before commit test flag. */ - public static final boolean FAIL_BEFORE_COMMIT_DEFAULT = false; - - /** Holds the name of the proeprty to get the fail after send flag from. */ - public static final String FAIL_AFTER_SEND_PROPNAME = "failAfterSend"; - - /** Holds the default failover after send test flag. */ - public static final boolean FAIL_AFTER_SEND_DEFAULT = false; - - /** Holds the name of the property to get the fail before send flag from. */ - public static final String FAIL_BEFORE_SEND_PROPNAME = "failBeforeSend"; - - /** Holds the default failover before send test flag. */ - public static final boolean FAIL_BEFORE_SEND_DEFAULT = false; - - /** Holds the name of the property to get the fail once flag from. */ - public static final String FAIL_ONCE_PROPNAME = "failOnce"; - - /** The default failover once flag, true means only do one failover, false means failover on every commit cycle. */ - public static final boolean FAIL_ONCE_DEFAULT = true; - - /** Holds the name of the property to get the broker access username from. */ - public static final String USERNAME_PROPNAME = "username"; - - /** Holds the default broker log on username. */ - public static final String USERNAME_DEFAULT = "guest"; - - /** Holds the name of the property to get the broker access password from. */ - public static final String PASSWORD_PROPNAME = "password"; - - /** Holds the default broker log on password. */ - public static final String PASSWORD_DEFAULT = "guest"; - - /** Holds the name of the proeprty to get the. */ - public static final String SELECTOR_PROPNAME = "selector"; - - /** Holds the default message selector. */ - public static final String SELECTOR_DEFAULT = ""; - - /** Holds the name of the property to get the destination count from. */ - public static final String DESTINATION_COUNT_PROPNAME = "destinationCount"; - - /** Defines the default number of destinations to ping. */ - public static final int DESTINATION_COUNT_DEFAULT = 1; - - /** Holds the name of the property to get the number of consumers per destination from. */ - public static final String NUM_CONSUMERS_PROPNAME = "numConsumers"; - - /** Defines the default number consumers per destination. */ - public static final int NUM_CONSUMERS_DEFAULT = 1; - - /** Holds the name of the property to get the waiting timeout for response messages. */ - public static final String TIMEOUT_PROPNAME = "timeout"; - - /** Default time to wait before assuming that a ping has timed out. */ - public static final long TIMEOUT_DEFAULT = 30000; - - /** Holds the name of the property to get the commit batch size from. */ - public static final String TX_BATCH_SIZE_PROPNAME = "commitBatchSize"; - - /** Defines the default number of pings to send in each transaction when running transactionally. */ - public static final int TX_BATCH_SIZE_DEFAULT = 1; - - /** Holds the name of the property to get the unique destinations flag from. */ - public static final String UNIQUE_DESTS_PROPNAME = "uniqueDests"; - - /** Defines the default value for the unique destinations property. */ - public static final boolean UNIQUE_DESTS_DEFAULT = true; - - /** Holds the name of the property to get the durable destinations flag from. */ - public static final String DURABLE_DESTS_PROPNAME = "durableDests"; - - /** Defines the default value of the durable destinations flag. */ - public static final boolean DURABLE_DESTS_DEFAULT = false; - - /** Holds the name of the proeprty to get the message acknowledgement mode from. */ - public static final String ACK_MODE_PROPNAME = "ackMode"; - - /** Defines the default message acknowledgement mode. */ - public static final int ACK_MODE_DEFAULT = Session.AUTO_ACKNOWLEDGE; - - /** Holds the name of the property to get the consumers message acknowledgement mode from. */ - public static final String CONSUMER_ACK_MODE_PROPNAME = "consAckMode"; - - /** Defines the default consumers message acknowledgement mode. */ - public static final int CONSUMER_ACK_MODE_DEFAULT = Session.AUTO_ACKNOWLEDGE; - - /** Holds the name of the property to get the maximum pending message size setting from. */ - public static final String MAX_PENDING_PROPNAME = "maxPending"; - - /** Defines the default value for the maximum pending message size setting. 0 means no limit. */ - public static final int MAX_PENDING_DEFAULT = 0; - - /** Defines the default prefetch size to use when consuming messages. */ - public static final int PREFETCH_DEFAULT = 100; - - /** Defines the default value of the no local flag to use when consuming messages. */ - public static final boolean NO_LOCAL_DEFAULT = false; - - /** Defines the default value of the exclusive flag to use when consuming messages. */ - public static final boolean EXCLUSIVE_DEFAULT = false; - - /** Holds the name of the property to store nanosecond timestamps in ping messages with. */ - public static final String MESSAGE_TIMESTAMP_PROPNAME = "timestamp"; - - /** Holds the default configuration properties. */ - public static ParsedProperties defaults = new ParsedProperties(); - - static - { - defaults.setPropertyIfNull(OVERRIDE_CLIENT_ID_PROPNAME, OVERRIDE_CLIENT_ID_DEAFULT); - defaults.setPropertyIfNull(FILE_PROPERTIES_PROPNAME, FILE_PROPERTIES_DEAFULT); - defaults.setPropertyIfNull(FACTORY_NAME_PROPNAME, FACTORY_NAME_DEAFULT); - defaults.setPropertyIfNull(BROKER_PROPNAME, BROKER_DEFAULT); - defaults.setPropertyIfNull(USERNAME_PROPNAME, USERNAME_DEFAULT); - defaults.setPropertyIfNull(PASSWORD_PROPNAME, PASSWORD_DEFAULT); - defaults.setPropertyIfNull(VIRTUAL_HOST_PROPNAME, VIRTUAL_HOST_DEFAULT); - defaults.setPropertyIfNull(PING_QUEUE_NAME_PROPNAME, PING_QUEUE_NAME_DEFAULT); - defaults.setPropertyIfNull(QUEUE_NAME_POSTFIX_PROPNAME, QUEUE_NAME_POSTFIX_DEFAULT); - defaults.setPropertyIfNull(SELECTOR_PROPNAME, SELECTOR_DEFAULT); - defaults.setPropertyIfNull(TRANSACTED_PROPNAME, TRANSACTED_DEFAULT); - defaults.setPropertyIfNull(CONSUMER_TRANSACTED_PROPNAME, CONSUMER_TRANSACTED_DEFAULT); - defaults.setPropertyIfNull(PERSISTENT_MODE_PROPNAME, PERSISTENT_MODE_DEFAULT); - defaults.setPropertyIfNull(ACK_MODE_PROPNAME, ACK_MODE_DEFAULT); - defaults.setPropertyIfNull(CONSUMER_ACK_MODE_PROPNAME, CONSUMER_ACK_MODE_DEFAULT); - defaults.setPropertyIfNull(MESSAGE_SIZE_PROPNAME, MESSAGE_SIZE_DEAFULT); - defaults.setPropertyIfNull(VERBOSE_PROPNAME, VERBOSE_DEFAULT); - defaults.setPropertyIfNull(PUBSUB_PROPNAME, PUBSUB_DEFAULT); - defaults.setPropertyIfNull(UNIQUE_DESTS_PROPNAME, UNIQUE_DESTS_DEFAULT); - defaults.setPropertyIfNull(DURABLE_DESTS_PROPNAME, DURABLE_DESTS_DEFAULT); - defaults.setPropertyIfNull(FAIL_BEFORE_COMMIT_PROPNAME, FAIL_BEFORE_COMMIT_DEFAULT); - defaults.setPropertyIfNull(FAIL_AFTER_COMMIT_PROPNAME, FAIL_AFTER_COMMIT_DEFAULT); - defaults.setPropertyIfNull(FAIL_BEFORE_SEND_PROPNAME, FAIL_BEFORE_SEND_DEFAULT); - defaults.setPropertyIfNull(FAIL_AFTER_SEND_PROPNAME, FAIL_AFTER_SEND_DEFAULT); - defaults.setPropertyIfNull(FAIL_ONCE_PROPNAME, FAIL_ONCE_DEFAULT); - defaults.setPropertyIfNull(TX_BATCH_SIZE_PROPNAME, TX_BATCH_SIZE_DEFAULT); - defaults.setPropertyIfNull(DESTINATION_COUNT_PROPNAME, DESTINATION_COUNT_DEFAULT); - defaults.setPropertyIfNull(NUM_CONSUMERS_PROPNAME, NUM_CONSUMERS_DEFAULT); - defaults.setPropertyIfNull(RATE_PROPNAME, RATE_DEFAULT); - defaults.setPropertyIfNull(TIMEOUT_PROPNAME, TIMEOUT_DEFAULT); - defaults.setPropertyIfNull(MAX_PENDING_PROPNAME, MAX_PENDING_DEFAULT); - } - - /** Allows setting of client ID on the connection, rather than through the connection URL. */ - protected boolean _overrideClientId; - - /** Holds the JNDI name of the JMS connection factory. */ - protected String _factoryName; - - /** Holds the name of the properties file to configure JNDI with. */ - protected String _fileProperties; - - /** Holds the broker url. */ - protected String _brokerDetails; - - /** Holds the username to access the broker with. */ - protected String _username; - - /** Holds the password to access the broker with. */ - protected String _password; - - /** Holds the virtual host on the broker to run the tests through. */ - protected String _virtualpath; - - /** Holds the root name from which to generate test destination names. */ - protected String _destinationName; - - /** Holds the default queue name postfix value. */ - protected String _queueNamePostfix; - - /** Holds the message selector to filter the pings with. */ - protected String _selector; - - /** Holds the producers transactional mode flag. */ - protected boolean _transacted; - - /** Holds the consumers transactional mode flag. */ - protected boolean _consTransacted; - - /** Determines whether this producer sends persistent messages. */ - protected boolean _persistent; - - /** Holds the acknowledgement mode used for the producers. */ - protected int _ackMode; - - /** Holds the acknowledgement mode setting for the consumers. */ - protected int _consAckMode; - - /** Determines what size of messages this producer sends. */ - protected int _messageSize; - - /** Used to indicate that the ping loop should print out whenever it pings. */ - protected boolean _verbose; - - /** Flag used to indicate if this is a point to point or pub/sub ping client. */ - protected boolean _isPubSub; - - /** Flag used to indicate if the destinations should be unique client. */ - protected boolean _isUnique; - - /** Flag used to indicate that durable destination should be used. */ - protected boolean _isDurable; - - /** Flag used to indicate that the user should be prompted to terminate a broker, to test failover before a commit. */ - protected boolean _failBeforeCommit; - - /** Flag used to indicate that the user should be prompted to terminate a broker, to test failover after a commit. */ - protected boolean _failAfterCommit; - - /** Flag used to indicate that the user should be prompted to terminate a broker, to test failover before a send. */ - protected boolean _failBeforeSend; - - /** Flag used to indicate that the user should be prompted to terminate a broker, to test failover after a send. */ - protected boolean _failAfterSend; - - /** Flag used to indicate that failover prompting should only be done on the first commit, not on every commit. */ - protected boolean _failOnce; - - /** Holds the number of sends that should be performed in every transaction when using transactions. */ - protected int _txBatchSize; - - /** Holds the number of destinations to ping. */ - protected int _noOfDestinations; - - /** Holds the number of consumers per destination. */ - protected int _noOfConsumers; - - /** Holds the maximum send rate in herz. */ - protected int _rate; - - /** - * Holds the size of the maximum amount of pending data that the client should buffer, sending is suspended - * if this limit is breached. - */ - protected int _maxPendingSize; - - /** A source for providing sequential unique correlation ids. These will be unique within the same JVM. */ - private static AtomicLong _correlationIdGenerator = new AtomicLong(0L); - - /** A source for providing sequential unqiue ids for instances of this class to be identifed with. */ - private static AtomicInteger _instanceIdGenerator = new AtomicInteger(0); - - /** Holds this instances unique id. */ - private int instanceId; - - /** - * Holds a map from message ids to latches on which threads wait for replies. This map is shared accross multiple - * ping producers on the same JVM. - */ - private static Map perCorrelationIds = - Collections.synchronizedMap(new HashMap()); - - /** A convenient formatter to use when time stamping output. */ - protected static final DateFormat timestampFormatter = new SimpleDateFormat("hh:mm:ss:SS"); - - /** Holds the connection for the message producer. */ - protected Connection _connection; - - /** Holds the consumer connections. */ - protected Connection[] _consumerConnection; - - /** Holds the controlSession on which ping replies are received. */ - protected Session[] _consumerSession; - - /** Holds the producer controlSession, needed to create ping messages. */ - protected Session _producerSession; - - /** Holds the destination where the response messages will arrive. */ - protected Destination _replyDestination; - - /** Holds the set of destinations that this ping producer pings. */ - protected List _pingDestinations; - - /** Used to restrict the sending rate to a specified limit. */ - protected Throttle _rateLimiter; - - /** Holds a message listener that this message listener chains all its messages to. */ - protected ChainedMessageListener _chainedMessageListener = null; - - /** - * This id generator is used to generate ids to append to the queue name to ensure that queues can be unique when - * creating multiple ping producers in the same JVM. - */ - protected static AtomicInteger _queueJVMSequenceID = new AtomicInteger(); - - /** - * This id generator is used to generates ids that are only unique within this pinger. Creating multiple pingers - * on the same JVM using this id generator will allow them to ping on the same queues. - */ - protected AtomicInteger _queueSharedID = new AtomicInteger(); - - /** Used to tell the ping loop when to terminate, it only runs while this is true. */ - protected boolean _publish = true; - - /** Holds the message producer to send the pings through. */ - protected MessageProducer _producer; - - /** Holds the message consumer to receive the ping replies through. */ - protected MessageConsumer[] _consumer; - - /** The prompt to display when asking the user to kill the broker for failover testing. */ - private static final String KILL_BROKER_PROMPT = "Kill broker now, then press Return."; - - /** Holds the name for this test client to be identified to the broker with. */ - private String _clientID; - - /** Keeps count of the total messages sent purely for debugging purposes. */ - private static AtomicInteger numSent = new AtomicInteger(); - - /** - * Holds a monitor which is used to synchronize sender and receivers threads, where the sender has elected - * to wait until the number of unreceived message is reduced before continuing to send. This monitor is a - * fair SynchronousQueue becuase that provides fair scheduling, to ensure that all producer threads get an - * equal chance to produce messages. - */ - static final SynchronousQueue _sendPauseMonitor = new SynchronousQueue(true); - - /** Keeps a count of the number of message currently sent but not received. */ - static AtomicInteger _unreceived = new AtomicInteger(0); - - /** - * Creates a ping producer with the specified parameters, of which there are many. See the class level comments - * for details. This constructor creates a connection to the broker and creates producer and consumer sessions on - * it, to send and recieve its pings and replies on. - * - * @param overrides Properties containing any desired overrides to the defaults. - * - * @throws Exception Any exceptions are allowed to fall through. - */ - public PingPongProducer(Properties overrides) throws Exception - { - // log.debug("public PingPongProducer(Properties overrides = " + overrides + "): called"); - instanceId = _instanceIdGenerator.getAndIncrement(); - - // Create a set of parsed properties from the defaults overriden by the passed in values. - ParsedProperties properties = new ParsedProperties(defaults); - properties.putAll(overrides); - - // Extract the configuration properties to set the pinger up with. - _overrideClientId = properties.getPropertyAsBoolean(OVERRIDE_CLIENT_ID_PROPNAME); - _factoryName = properties.getProperty(FACTORY_NAME_PROPNAME); - _fileProperties = properties.getProperty(FILE_PROPERTIES_PROPNAME); - _brokerDetails = properties.getProperty(BROKER_PROPNAME); - _username = properties.getProperty(USERNAME_PROPNAME); - _password = properties.getProperty(PASSWORD_PROPNAME); - _virtualpath = properties.getProperty(VIRTUAL_HOST_PROPNAME); - _destinationName = properties.getProperty(PING_QUEUE_NAME_PROPNAME); - _queueNamePostfix = properties.getProperty(QUEUE_NAME_POSTFIX_PROPNAME); - _selector = properties.getProperty(SELECTOR_PROPNAME); - _transacted = properties.getPropertyAsBoolean(TRANSACTED_PROPNAME); - _consTransacted = properties.getPropertyAsBoolean(CONSUMER_TRANSACTED_PROPNAME); - _persistent = properties.getPropertyAsBoolean(PERSISTENT_MODE_PROPNAME); - _messageSize = properties.getPropertyAsInteger(MESSAGE_SIZE_PROPNAME); - _verbose = properties.getPropertyAsBoolean(VERBOSE_PROPNAME); - _failAfterCommit = properties.getPropertyAsBoolean(FAIL_AFTER_COMMIT_PROPNAME); - _failBeforeCommit = properties.getPropertyAsBoolean(FAIL_BEFORE_COMMIT_PROPNAME); - _failAfterSend = properties.getPropertyAsBoolean(FAIL_AFTER_SEND_PROPNAME); - _failBeforeSend = properties.getPropertyAsBoolean(FAIL_BEFORE_SEND_PROPNAME); - _failOnce = properties.getPropertyAsBoolean(FAIL_ONCE_PROPNAME); - _txBatchSize = properties.getPropertyAsInteger(TX_BATCH_SIZE_PROPNAME); - _noOfDestinations = properties.getPropertyAsInteger(DESTINATION_COUNT_PROPNAME); - _noOfConsumers = properties.getPropertyAsInteger(NUM_CONSUMERS_PROPNAME); - _rate = properties.getPropertyAsInteger(RATE_PROPNAME); - _isPubSub = properties.getPropertyAsBoolean(PUBSUB_PROPNAME); - _isUnique = properties.getPropertyAsBoolean(UNIQUE_DESTS_PROPNAME); - _isDurable = properties.getPropertyAsBoolean(DURABLE_DESTS_PROPNAME); - _ackMode = _transacted ? 0 : properties.getPropertyAsInteger(ACK_MODE_PROPNAME); - _consAckMode = _consTransacted ? 0 : properties.getPropertyAsInteger(CONSUMER_ACK_MODE_PROPNAME); - _maxPendingSize = properties.getPropertyAsInteger(MAX_PENDING_PROPNAME); - - // Check that one or more destinations were specified. - if (_noOfDestinations < 1) - { - throw new IllegalArgumentException("There must be at least one destination."); - } - - // Set up a throttle to control the send rate, if a rate > 0 is specified. - if (_rate > 0) - { - _rateLimiter = new BatchedThrottle(); - _rateLimiter.setRate(_rate); - } - - // Create the connection and message producers/consumers. - // establishConnection(true, true); - } - - /** - * Establishes a connection to the broker and creates message consumers and producers based on the parameters - * that this ping client was created with. - * - * @param producer Flag to indicate whether or not the producer should be set up. - * @param consumer Flag to indicate whether or not the consumers should be set up. - * - * @throws Exception Any exceptions are allowed to fall through. - */ - public void establishConnection(boolean producer, boolean consumer) throws Exception - { - // log.debug("public void establishConnection(): called"); - - // Generate a unique identifying name for this client, based on it ip address and the current time. - InetAddress address = InetAddress.getLocalHost(); - // _clientID = address.getHostName() + System.currentTimeMillis(); - _clientID = "perftest_" + instanceId; - - // Create a connection to the broker. - createConnection(_clientID); - - // Create transactional or non-transactional sessions, based on the command line arguments. - _producerSession = _connection.createSession(_transacted, _ackMode); - - _consumerSession = new Session[_noOfConsumers]; - - for (int i = 0; i < _noOfConsumers; i++) - { - _consumerSession[i] = _consumerConnection[i].createSession(_consTransacted, _consAckMode); - } - - // Create the destinations to send pings to and receive replies from. - _replyDestination = _consumerSession[0].createTemporaryQueue(); - createPingDestinations(_noOfDestinations, _selector, _destinationName, _isUnique, _isDurable); - - // Create the message producer only if instructed to. - if (producer) - { - createProducer(); - } - - // Create the message consumer only if instructed to. - if (consumer) - { - createReplyConsumers(getReplyDestinations(), _selector); - } - } - - /** - * Establishes a connection to the broker, based on the configuration parameters that this ping client was - * created with. - * - * @param clientID The clients identifier. - * - * @throws JMSException Underlying exceptions allowed to fall through. - * @throws NamingException Underlying exceptions allowed to fall through. - * @throws IOException Underlying exceptions allowed to fall through. - */ - protected void createConnection(String clientID) throws JMSException, NamingException, IOException - { - // _log.debug("protected void createConnection(String clientID = " + clientID + "): called"); - - // _log.debug("Creating a connection for the message producer."); - File propsFile = new File(_fileProperties); - InputStream is = new FileInputStream(propsFile); - Properties properties = new Properties(); - properties.load(is); - - Context context = new InitialContext(properties); - ConnectionFactory factory = (ConnectionFactory) context.lookup(_factoryName); - _connection = factory.createConnection(_username, _password); - - if (_overrideClientId) - { - _connection.setClientID(clientID); - } - - // _log.debug("Creating " + _noOfConsumers + " connections for the consumers."); - - _consumerConnection = new Connection[_noOfConsumers]; - - for (int i = 0; i < _noOfConsumers; i++) - { - _consumerConnection[i] = factory.createConnection(_username, _password); - // _consumerConnection[i].setClientID(clientID); - } - } - - /** - * Starts a ping-pong loop running from the command line. The bounce back client {@link PingPongBouncer} also needs - * to be started to bounce the pings back again. - * - * @param args The command line arguments. - */ - public static void main(String[] args) - { - try - { - Properties options = - CommandLineParser.processCommandLine(args, new CommandLineParser(new String[][] {}), System.getProperties()); - - // Create a ping producer overriding its defaults with all options passed on the command line. - PingPongProducer pingProducer = new PingPongProducer(options); - pingProducer.establishConnection(true, true); - - // Start the ping producers dispatch thread running. - pingProducer._connection.start(); - - // Create a shutdown hook to terminate the ping-pong producer. - Runtime.getRuntime().addShutdownHook(pingProducer.getShutdownHook()); - - // Ensure that the ping pong producer is registered to listen for exceptions on the connection too. - pingProducer._connection.setExceptionListener(pingProducer); - - // Create the ping loop thread and run it until it is terminated by the shutdown hook or exception. - Thread pingThread = new Thread(pingProducer); - pingThread.run(); - pingThread.join(); - } - catch (Exception e) - { - System.err.println(e.getMessage()); - log.error("Top level handler caught execption.", e); - System.exit(1); - } - } - - /** - * Convenience method for a short pause. - * - * @param sleepTime The time in milliseconds to pause for. - */ - public static void pause(long sleepTime) - { - if (sleepTime > 0) - { - try - { - Thread.sleep(sleepTime); - } - catch (InterruptedException ie) - { } - } - } - - /** - * Gets all the reply destinations (to listen for replies on). In this case this will just be the single reply to - * destination of this pinger. - * - * @return The single reply to destination of this pinger, wrapped in a list. - */ - public List getReplyDestinations() - { - // log.debug("public List getReplyDestinations(): called"); - - List replyDestinations = new ArrayList(); - replyDestinations.add(_replyDestination); - - // log.debug("replyDestinations = " + replyDestinations); - - return replyDestinations; - } - - /** - * Creates the producer to send the pings on. This is created without a default destination. Its persistent delivery - * flag is set accoring the ping producer creation options. - * - * @throws JMSException Any JMSExceptions are allowed to fall through. - */ - public void createProducer() throws JMSException - { - // log.debug("public void createProducer(): called"); - - _producer = (MessageProducer) _producerSession.createProducer(null); - _producer.setDeliveryMode(_persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT); - - // log.debug("Created producer for " + (_persistent ? "persistent" : "non-persistent") + " messages."); - } - - /** - * Creates consumers for the specified number of destinations. The destinations themselves are also created by this - * method. - * - * @param noOfDestinations The number of destinations to create consumers for. - * @param selector The message selector to filter the consumers with. - * @param rootName The root of the name, or actual name if only one is being created. - * @param unique true to make the destinations unique to this pinger, false to share the - * numbering with all pingers on the same JVM. - * @param durable If the destinations are durable topics. - * - * @throws JMSException Any JMSExceptions are allowed to fall through. - */ - public void createPingDestinations(int noOfDestinations, String selector, String rootName, boolean unique, - boolean durable) throws JMSException - { - /*log.debug("public void createPingDestinations(int noOfDestinations = " + noOfDestinations + ", String selector = " - + selector + ", String rootName = " + rootName + ", boolean unique = " + unique + ", boolean durable = " - + durable + "): called");*/ - - _pingDestinations = new ArrayList(); - - // Create the desired number of ping destinations and consumers for them. - // log.debug("Creating " + noOfDestinations + " destinations to ping."); - - for (int i = 0; i < noOfDestinations; i++) - { - Destination destination; - String id; - - // Generate an id, unique within this pinger or to the whole JVM depending on the unique flag. - if (unique) - { - // log.debug("Creating unique destinations."); - id = "_" + _queueJVMSequenceID.incrementAndGet() + "_" + _connection.getClientID(); - } - else - { - // log.debug("Creating shared destinations."); - id = "_" + _queueSharedID.incrementAndGet(); - } - - // Check if this is a pub/sub pinger, in which case create topics. - if (_isPubSub) - { - destination = _producerSession.createTopic(rootName + id); - // log.debug("Created non-durable topic " + destination); - - if (durable) - { - _producerSession.createDurableSubscriber((Topic) destination, _connection.getClientID()); - } - } - // Otherwise this is a p2p pinger, in which case create queues. - else - { - destination = _producerSession.createQueue(rootName + id + _queueNamePostfix); - // log.debug("Created queue " + destination); - } - - // Keep the destination. - _pingDestinations.add(destination); - } - } - - /** - * Creates consumers for the specified destinations and registers this pinger to listen to their messages. - * - * @param destinations The destinations to listen to. - * @param selector A selector to filter the messages with. - * - * @throws javax.jms.JMSException Any JMSExceptions are allowed to fall through. - */ - public void createReplyConsumers(Collection destinations, String selector) throws JMSException - { - /*log.debug("public void createReplyConsumers(Collection destinations = " + destinations - + ", String selector = " + selector + "): called");*/ - - log.debug("There are " + destinations.size() + " destinations."); - log.debug("Creating " + _noOfConsumers + " consumers on each destination."); - log.debug("Total number of consumers is: " + (destinations.size() * _noOfConsumers)); - - for (Destination destination : destinations) - { - _consumer = new MessageConsumer[_noOfConsumers]; - - for (int i = 0; i < _noOfConsumers; i++) - { - // Create a consumer for the destination and set this pinger to listen to its messages. - _consumer[i] = _consumerSession[i].createConsumer(destination, selector, NO_LOCAL_DEFAULT); - - final int consumerNo = i; - - _consumer[i].setMessageListener(new MessageListener() - { - public void onMessage(Message message) - { - onMessageWithConsumerNo(message, consumerNo); - } - }); - - log.debug("Set consumer " + i + " to listen to replies sent to destination: " + destination); - } - } - } - - /** - * Stores the received message in the replies map, then resets the boolean latch that a thread waiting for a - * correlating reply may be waiting on. This is only done if the reply has a correlation id that is expected in the - * replies map. - * - * @param message The received message. - * @param consumerNo The consumer number within this test pinger instance. - */ - public void onMessageWithConsumerNo(Message message, int consumerNo) - { - // log.debug("public void onMessageWithConsumerNo(Message message, int consumerNo = " + consumerNo + "): called"); - try - { - long now = System.nanoTime(); - long timestamp = getTimestamp(message); - long pingTime = now - timestamp; - - // NDC.push("id" + instanceId + "/cons" + consumerNo); - - // Extract the messages correlation id. - String correlationID = message.getJMSCorrelationID(); - // log.debug("correlationID = " + correlationID); - - // int num = message.getIntProperty("MSG_NUM"); - // log.info("Message " + num + " received."); - - boolean isRedelivered = message.getJMSRedelivered(); - // log.debug("isRedelivered = " + isRedelivered); - - if (!isRedelivered) - { - // Countdown on the traffic light if there is one for the matching correlation id. - PerCorrelationId perCorrelationId = perCorrelationIds.get(correlationID); - - if (perCorrelationId != null) - { - CountDownLatch trafficLight = perCorrelationId.trafficLight; - - // Restart the timeout timer on every message. - perCorrelationId.timeOutStart = System.nanoTime(); - - // log.debug("Reply was expected, decrementing the latch for the id, " + correlationID); - - // Release waiting senders if there are some and using maxPending limit. - if ((_maxPendingSize > 0)) - { - // Decrement the count of sent but not yet received messages. - int unreceived = _unreceived.decrementAndGet(); - int unreceivedSize = - (unreceived * ((_messageSize == 0) ? 1 : _messageSize)) - / (_isPubSub ? getConsumersPerDestination() : 1); - - // log.debug("unreceived = " + unreceived); - // log.debug("unreceivedSize = " + unreceivedSize); - - // synchronized (_sendPauseMonitor) - // { - if (unreceivedSize < _maxPendingSize) - { - _sendPauseMonitor.poll(); - } - // } - } - - // Decrement the countdown latch. Before this point, it is possible that two threads might enter this - // method simultanesouly with the same correlation id. Decrementing the latch in a synchronized block - // ensures that each thread will get a unique value for the remaining messages. - long trueCount; - long remainingCount; - - synchronized (trafficLight) - { - trafficLight.countDown(); - - trueCount = trafficLight.getCount(); - remainingCount = trueCount - 1; - - // NDC.push("/rem" + remainingCount); - - // log.debug("remainingCount = " + remainingCount); - // log.debug("trueCount = " + trueCount); - - // Commit on transaction batch size boundaries. At this point in time the waiting producer - // remains blocked, even on the last message. - // Commit count is divided by noOfConsumers in p2p mode, so that each consumer only commits on - // each batch boundary. For pub/sub each consumer gets every message so no division is done. - // When running in client ack mode, an ack is done instead of a commit, on the commit batch - // size boundaries. - long commitCount = _isPubSub ? remainingCount : (remainingCount / _noOfConsumers); - // log.debug("commitCount = " + commitCount); - - if ((commitCount % _txBatchSize) == 0) - { - if (_consAckMode == 2) - { - // log.debug("Doing client ack for consumer " + consumerNo + "."); - message.acknowledge(); - } - else - { - // log.debug("Trying commit for consumer " + consumerNo + "."); - commitTx(_consumerSession[consumerNo]); - // log.info("Tx committed on consumer " + consumerNo); - } - } - - // Forward the message and remaining count to any interested chained message listener. - if (_chainedMessageListener != null) - { - _chainedMessageListener.onMessage(message, (int) remainingCount, pingTime); - } - - // Check if this is the last message, in which case release any waiting producers. This is done - // after the transaction has been committed and any listeners notified. - if (trueCount == 1) - { - trafficLight.countDown(); - } - } - } - else - { - log.warn("Got unexpected message with correlationId: " + correlationID); - } - } - else - { - log.warn("Got redelivered message, ignoring."); - } - } - catch (JMSException e) - { - log.warn("There was a JMSException: " + e.getMessage(), e); - } - finally - { - // log.debug("public void onMessageWithConsumerNo(Message message, int consumerNo): ending"); - // NDC.clear(); - } - } - - /** - * Sends the specified number of ping message and then waits for all correlating replies. If the wait times out - * before a reply arrives, then a null reply is returned from this method. This method allows the caller to specify - * the correlation id. - * - * @param message The message to send. If this is null, one is generated. - * @param numPings The number of ping messages to send. - * @param timeout The timeout in milliseconds. - * @param messageCorrelationId The message correlation id. If this is null, one is generated. - * - * @return The number of replies received. This may be less than the number sent if the timeout terminated the wait - * for all prematurely. - * - * @throws JMSException All underlying JMSExceptions are allowed to fall through. - * @throws InterruptedException When interrupted by a timeout - */ - public int pingAndWaitForReply(Message message, int numPings, long timeout, String messageCorrelationId) - throws JMSException, InterruptedException - { - /*log.debug("public int pingAndWaitForReply(Message message, int numPings = " + numPings + ", long timeout = " - + timeout + ", String messageCorrelationId = " + messageCorrelationId + "): called");*/ - - // Generate a unique correlation id to put on the messages before sending them, if one was not specified. - if (messageCorrelationId == null) - { - messageCorrelationId = Long.toString(_correlationIdGenerator.incrementAndGet()); - } - - try - { - // NDC.push("prod"); - - // Create a count down latch to count the number of replies with. This is created before the messages are - // sent so that the replies cannot be received before the count down is created. - // One is added to this, so that the last reply becomes a special case. The special case is that the - // chained message listener must be called before this sender can be unblocked, but that decrementing the - // countdown needs to be done before the chained listener can be called. - PerCorrelationId perCorrelationId = new PerCorrelationId(); - - perCorrelationId.trafficLight = new CountDownLatch(getExpectedNumPings(numPings) + 1); - perCorrelationIds.put(messageCorrelationId, perCorrelationId); - - // Set up the current time as the start time for pinging on the correlation id. This is used to determine - // timeouts. - perCorrelationId.timeOutStart = System.nanoTime(); - - // Send the specifed number of messages. - pingNoWaitForReply(message, numPings, messageCorrelationId); - - boolean timedOut; - boolean allMessagesReceived; - int numReplies; - - do - { - // Block the current thread until replies to all the messages are received, or it times out. - perCorrelationId.trafficLight.await(timeout, TimeUnit.MILLISECONDS); - - // Work out how many replies were receieved. - numReplies = getExpectedNumPings(numPings) - (int) perCorrelationId.trafficLight.getCount(); - - allMessagesReceived = numReplies == getExpectedNumPings(numPings); - - // log.debug("numReplies = " + numReplies); - // log.debug("allMessagesReceived = " + allMessagesReceived); - - // Recheck the timeout condition. - long now = System.nanoTime(); - long lastMessageReceievedAt = perCorrelationId.timeOutStart; - timedOut = (now - lastMessageReceievedAt) > (timeout * 1000000); - - // log.debug("now = " + now); - // log.debug("lastMessageReceievedAt = " + lastMessageReceievedAt); - } - while (!timedOut && !allMessagesReceived); - - if ((numReplies < getExpectedNumPings(numPings)) && _verbose) - { - log.info("Timed out (" + timeout + " ms) before all replies received on id, " + messageCorrelationId); - } - else if (_verbose) - { - log.info("Got all replies on id, " + messageCorrelationId); - } - - // commitTx(_consumerSession); - - // log.debug("public int pingAndWaitForReply(Message message, int numPings, long timeout): ending"); - - return numReplies; - } - // Ensure that the message countdown latch is always removed from the reply map. The reply map is long lived, - // so will be a memory leak if this is not done. - finally - { - // NDC.pop(); - perCorrelationIds.remove(messageCorrelationId); - } - } - - /** - * Sends the specified number of ping messages and does not wait for correlating replies. - * - * @param message The message to send. - * @param numPings The number of pings to send. - * @param messageCorrelationId A correlation id to place on all messages sent. - * - * @throws JMSException All underlying JMSExceptions are allowed to fall through. - */ - public void pingNoWaitForReply(Message message, int numPings, String messageCorrelationId) throws JMSException - { - /*log.debug("public void pingNoWaitForReply(Message message, int numPings = " + numPings - + ", String messageCorrelationId = " + messageCorrelationId + "): called");*/ - - if (message == null) - { - message = getTestMessage(getReplyDestinations().get(0), _messageSize, _persistent); - } - - message.setJMSCorrelationID(messageCorrelationId); - - // Set up a committed flag to detect uncommitted messages at the end of the send loop. This may occurr if the - // transaction batch size is not a factor of the number of pings. In which case an extra commit at the end is - // needed. - boolean committed = false; - - // Send all of the ping messages. - for (int i = 0; i < numPings; i++) - { - // Re-timestamp the message. - // message.setLongProperty(MESSAGE_TIMESTAMP_PROPNAME, System.nanoTime()); - - // Send the message, passing in the message count. - committed = sendMessage(i, message); - - // Spew out per message timings on every message sonly in verbose mode. - /*if (_verbose) - { - log.info(timestampFormatter.format(new Date()) + ": Pinged at with correlation id, " + messageCorrelationId); - }*/ - } - - // Call commit if the send loop finished before reaching a batch size boundary so there may still be uncommitted messages. - if (!committed) - { - commitTx(_producerSession); - } - } - - /** - * Sends the sepcified message, applies rate limiting and possibly commits the current transaction. The count of - * messages sent so far must be specified and is used to round robin the ping destinations (where there are more - * than one), and to determine if the transaction batch size has been reached and the sent messages should be - * committed. - * - * @param i The count of messages sent so far in a loop of multiple calls to this send method. - * @param message The message to send. - * - * @return true if the messages were committed, false otherwise. - * - * @throws JMSException All underlyiung JMSExceptions are allowed to fall through. - */ - protected boolean sendMessage(int i, Message message) throws JMSException - { - try - { - NDC.push("id" + instanceId + "/prod"); - - // log.debug("protected boolean sendMessage(int i = " + i + ", Message message): called"); - // log.debug("_txBatchSize = " + _txBatchSize); - - // Round robin the destinations as the messages are sent. - Destination destination = _pingDestinations.get(i % _pingDestinations.size()); - - // Prompt the user to kill the broker when doing failover testing. - _failBeforeSend = waitForUserToPromptOnFailure(_failBeforeSend); - - // Get the test setup for the correlation id. - String correlationID = message.getJMSCorrelationID(); - PerCorrelationId perCorrelationId = perCorrelationIds.get(correlationID); - - // If necessary, wait until the max pending message size comes within its limit. - if (_maxPendingSize > 0) - { - synchronized (_sendPauseMonitor) - { - // Used to keep track of the number of times that send has to wait. - int numWaits = 0; - - // The maximum number of waits before the test gives up and fails. This has been chosen to correspond with - // the test timeout. - int waitLimit = (int) (TIMEOUT_DEFAULT / 10000); - - while (true) - { - // Get the size estimate of sent but not yet received messages. - int unreceived = _unreceived.get(); - int unreceivedSize = - (unreceived * ((_messageSize == 0) ? 1 : _messageSize)) - / (_isPubSub ? getConsumersPerDestination() : 1); - - // log.debug("unreceived = " + unreceived); - // log.debug("unreceivedSize = " + unreceivedSize); - // log.debug("_maxPendingSize = " + _maxPendingSize); - - if (unreceivedSize > _maxPendingSize) - { - // log.debug("unreceived size estimate over limit = " + unreceivedSize); - - // Fail the test if the send has had to wait more than the maximum allowed number of times. - if (numWaits > waitLimit) - { - String errorMessage = - "Send has had to wait for the unreceivedSize (" + unreceivedSize - + ") to come below the maxPendingSize (" + _maxPendingSize + ") more that " + waitLimit - + " times."; - log.warn(errorMessage); - throw new RuntimeException(errorMessage); - } - - // Wait on the send pause barrier for the limit to be re-established. - try - { - long start = System.nanoTime(); - // _sendPauseMonitor.wait(10000); - _sendPauseMonitor.offer(new Object(), 10000, TimeUnit.MILLISECONDS); - long end = System.nanoTime(); - - // Count the wait only if it was for > 99% of the requested wait time. - if (((float) (end - start) / (float) (10000 * 1000000L)) > 0.99) - { - numWaits++; - } - } - catch (InterruptedException e) - { - // Restore the interrupted status - Thread.currentThread().interrupt(); - throw new RuntimeException(e); - } - } - else - { - break; - } - } - } - } - - // Send the message either to its round robin destination, or its default destination. - // int num = numSent.incrementAndGet(); - // message.setIntProperty("MSG_NUM", num); - setTimestamp(message); - - if (destination == null) - { - _producer.send(message); - } - else - { - _producer.send(destination, message); - } - - // Increase the unreceived size, this may actually happen after the message is received. - // The unreceived size is incremented by the number of consumers that will get a copy of the message, - // in pub/sub mode. - if (_maxPendingSize > 0) - { - int newUnreceivedCount = _unreceived.addAndGet(_isPubSub ? getConsumersPerDestination() : 1); - // log.debug("newUnreceivedCount = " + newUnreceivedCount); - } - - // Apply message rate throttling if a rate limit has been set up. - if (_rateLimiter != null) - { - _rateLimiter.throttle(); - } - - // Call commit every time the commit batch size is reached. - boolean committed = false; - - // Commit on every transaction batch size boundary. Here i + 1 is the count of actual messages sent. - if (((i + 1) % _txBatchSize) == 0) - { - // log.debug("Trying commit on producer session."); - committed = commitTx(_producerSession); - } - - return committed; - } - finally - { - NDC.clear(); - } - } - - /** - * If the specified fail flag is set, this method waits for the user to cause a failure and then indicate to the - * test that the failure has occurred, before the method returns. - * - * @param failFlag The fail flag to test. - * - * @return The new value for the fail flag. If the {@link #_failOnce} flag is set, then each fail flag is only - * used once, then reset. - */ - private boolean waitForUserToPromptOnFailure(boolean failFlag) - { - if (failFlag) - { - if (_failOnce) - { - failFlag = false; - } - - // log.debug("Failing Before Send"); - waitForUser(KILL_BROKER_PROMPT); - } - - return failFlag; - } - - /** - * Implements a single iteration of the ping loop. This sends the number of pings specified by the transaction batch - * size property, and waits for replies to all of them. Any errors cause the publish flag to be cleared, which will - * terminate the pinger. - */ - public void pingLoop() - { - try - { - // Generate a sample message and time stamp it. - Message msg = getTestMessage(_replyDestination, _messageSize, _persistent); - // setTimestamp(msg); - - // Send the message and wait for a reply. - pingAndWaitForReply(msg, TX_BATCH_SIZE_DEFAULT, TIMEOUT_DEFAULT, null); - } - catch (JMSException e) - { - _publish = false; - // log.debug("There was a JMSException: " + e.getMessage(), e); - } - catch (InterruptedException e) - { - _publish = false; - // log.debug("There was an interruption: " + e.getMessage(), e); - } - } - - /** - * Sets a chained message listener. The message listener on this pinger, chains all its messages to the one set - * here. - * - * @param messageListener The chained message listener. - */ - public void setChainedMessageListener(ChainedMessageListener messageListener) - { - _chainedMessageListener = messageListener; - } - - /** Removes any chained message listeners from this pinger. */ - public void removeChainedMessageListener() - { - _chainedMessageListener = null; - } - - /** - * Generates a test message of the specified size, with the specified reply-to destination and persistence flag. - * - * @param replyQueue The reply-to destination for the message. - * @param messageSize The desired size of the message in bytes. - * @param persistent true if the message should use persistent delivery, false otherwise. - * - * @return A freshly generated test message. - * - * @throws javax.jms.JMSException All underlying JMSException are allowed to fall through. - */ - public Message getTestMessage(Destination replyQueue, int messageSize, boolean persistent) throws JMSException - { - // return TestMessageFactory.newObjectMessage(_producerSession, replyQueue, messageSize, persistent); - return TestUtils.createTestMessageOfSize(_producerSession, messageSize); - } - - /** - * Sets the current time in nanoseconds as the timestamp on the message. - * - * @param msg The message to timestamp. - * - * @throws JMSException Any JMSExceptions are allowed to fall through. - */ - protected void setTimestamp(Message msg) throws JMSException - { - /*if (((AMQSession)_producerSession).isStrictAMQP()) - { - ((AMQMessage)msg).setTimestampProperty(new AMQShortString(MESSAGE_TIMESTAMP_PROPNAME), System.nanoTime()); - } - else - {*/ - msg.setLongProperty(MESSAGE_TIMESTAMP_PROPNAME, System.nanoTime()); - // } - } - - /** - * Extracts the nanosecond timestamp from a message. - * - * @param msg The message to extract the time stamp from. - * - * @return The timestamp in nanos. - * - * @throws JMSException Any JMSExceptions are allowed to fall through. - */ - protected long getTimestamp(Message msg) throws JMSException - { - /*if (((AMQSession)_producerSession).isStrictAMQP()) - { - Long value = ((AMQMessage)msg).getTimestampProperty(new AMQShortString(MESSAGE_TIMESTAMP_PROPNAME)); - - return (value == null) ? 0L : value; - } - else - {*/ - return msg.getLongProperty(PingPongProducer.MESSAGE_TIMESTAMP_PROPNAME); - // } - } - - /** - * Stops the ping loop by clearing the publish flag. The current loop will complete when it notices that this flag - * has been cleared. - */ - public void stop() - { - _publish = false; - } - - /** - * Starts the producer and consumer connections. - * - * @throws JMSException Any JMSExceptions are allowed to fall through. - */ - public void start() throws JMSException - { - // log.debug("public void start(): called"); - - _connection.start(); - // log.debug("Producer started."); - - for (int i = 0; i < _noOfConsumers; i++) - { - _consumerConnection[i].start(); - // log.debug("Consumer " + i + " started."); - } - } - - /** Implements a ping loop that repeatedly pings until the publish flag becomes false. */ - public void run() - { - // Keep running until the publish flag is cleared. - while (_publish) - { - pingLoop(); - } - } - - /** - * Callback method, implementing ExceptionListener. This should be registered to listen for exceptions on the - * connection, this clears the publish flag which in turn will halt the ping loop. - * - * @param e The exception that triggered this callback method. - */ - public void onException(JMSException e) - { - // log.debug("public void onException(JMSException e = " + e + "): called", e); - _publish = false; - } - - /** - * Gets a shutdown hook that will cleanly shut this down when it is running the ping loop. This can be registered - * with the runtime system as a shutdown hook. - * - * @return A shutdown hook for the ping loop. - */ - public Thread getShutdownHook() - { - return new Thread(new Runnable() - { - public void run() - { - stop(); - } - }); - } - - /** - * Closes all of the producer and consumer connections. - * - * @throws JMSException All JMSException are allowed to fall through. - */ - public void close() throws JMSException - { - // log.debug("public void close(): called"); - - try - { - if (_connection != null) - { - // log.debug("Before close producer connection."); - _connection.close(); - // log.debug("Closed producer connection."); - } - - for (int i = 0; i < _noOfConsumers; i++) - { - if (_consumerConnection[i] != null) - { - // log.debug("Before close consumer connection " + i + "."); - _consumerConnection[i].close(); - // log.debug("Closed consumer connection " + i + "."); - } - } - } - finally - { - _connection = null; - _producerSession = null; - _consumerSession = null; - _consumerConnection = null; - _producer = null; - _consumer = null; - _pingDestinations = null; - _replyDestination = null; - } - } - - /** - * Convenience method to commit the transaction on the specified controlSession. If the controlSession to commit on is not a - * transactional controlSession, this method does nothing (unless the failover after send flag is set). - * - *

If the {@link #_failAfterSend} flag is set, this will prompt the user to kill the broker before the commit is - * applied. This flag applies whether the pinger is transactional or not. - * - *

If the {@link #_failBeforeCommit} flag is set, this will prompt the user to kill the broker before the commit - * is applied. If the {@link #_failAfterCommit} flag is set, this will prompt the user to kill the broker after the - * commit is applied. These flags will only apply if using a transactional pinger. - * - * @param session The controlSession to commit - * - * @return true if the controlSession was committed, false if it was not. - * - * @throws javax.jms.JMSException If the commit fails and then the rollback fails. - * - * @todo Consider moving the fail after send logic into the send method. It is confusing to have it in this commit - * method, because commits only apply to transactional pingers, but fail after send applied to transactional and - * non-transactional alike. - */ - protected boolean commitTx(Session session) throws JMSException - { - // log.debug("protected void commitTx(Session session): called"); - - boolean committed = false; - - _failAfterSend = waitForUserToPromptOnFailure(_failAfterSend); - - if (session.getTransacted()) - { - // log.debug("Session is transacted."); - - try - { - _failBeforeCommit = waitForUserToPromptOnFailure(_failBeforeCommit); - - long start = System.nanoTime(); - session.commit(); - committed = true; - // log.debug("Time taken to commit :" + ((System.nanoTime() - start) / 1000000f) + " ms"); - - _failAfterCommit = waitForUserToPromptOnFailure(_failAfterCommit); - - // log.debug("Session Commited."); - } - catch (JMSException e) - { - // log.debug("JMSException on commit:" + e.getMessage(), e); - - try - { - session.rollback(); - // log.debug("Message rolled back."); - } - catch (JMSException jmse) - { - // log.debug("JMSE on rollback:" + jmse.getMessage(), jmse); - - // Both commit and rollback failed. Throw the rollback exception. - throw jmse; - } - } - } - - return committed; - } - - /** - * Outputs a prompt to the console and waits for the user to press return. - * - * @param prompt The prompt to display on the console. - */ - public void waitForUser(String prompt) - { - System.out.println(prompt); - - try - { - System.in.read(); - } - catch (IOException e) - { - // Ignored. - } - - System.out.println("Continuing."); - } - - /** - * Gets the number of consumers that are listening to each destination in the test. - * - * @return int The number of consumers subscribing to each topic. - */ - public int getConsumersPerDestination() - { - return _noOfConsumers; - } - - /** - * Calculates how many pings are expected to be received for the given number sent. - * - * @param numpings The number of pings that will be sent. - * - * @return The number that should be received, for the test to pass. - */ - public int getExpectedNumPings(int numpings) - { - // log.debug("public int getExpectedNumPings(int numpings = " + numpings + "): called"); - - // log.debug("Each ping will be received by " + (_isPubSub ? getConsumersPerDestination() : 1) + " consumers."); - - return numpings * (_isPubSub ? getConsumersPerDestination() : 1); - } - - /** - * Defines a chained message listener interface that can be attached to this pinger. Whenever this pinger's {@link - * PingPongProducer#onMessageWithConsumerNo} method is called, the chained listener set through the {@link - * PingPongProducer#setChainedMessageListener} method is passed the message, and the remaining expected count of - * messages with that correlation id. - * - *

Provided only one pinger is producing messages with that correlation id, the chained listener will always be - * given unique message counts. It will always be called while the producer waiting for all messages to arrive is - * still blocked. - */ - public static interface ChainedMessageListener - { - /** - * Notifies interested listeners about message arrival and important test stats, the number of messages - * remaining in the test, and the messages send timestamp. - * - * @param message The newly arrived message. - * @param remainingCount The number of messages left to complete the test. - * @param latency The nanosecond latency of the message. - * - * @throws JMSException Any JMS exceptions is allowed to fall through. - */ - public void onMessage(Message message, int remainingCount, long latency) throws JMSException; - } - - /** - * Holds information on each correlation id. The countdown latch, the current timeout timer... More stuff to be - * added to this: read/write lock to make onMessage more concurrent as described in class header comment. - */ - protected static class PerCorrelationId - { - /** Holds a countdown on number of expected messages. */ - CountDownLatch trafficLight; - - /** Holds the last timestamp that the timeout was reset to. */ - Long timeOutStart; - } -} +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.requestreply; + +import org.apache.log4j.Logger; +import org.apache.log4j.NDC; + +import org.apache.qpid.test.framework.TestUtils; + +import org.apache.qpid.junit.extensions.BatchedThrottle; +import org.apache.qpid.junit.extensions.Throttle; +import org.apache.qpid.junit.extensions.util.CommandLineParser; +import org.apache.qpid.junit.extensions.util.ParsedProperties; + +import javax.jms.*; +import javax.naming.Context; +import javax.naming.InitialContext; +import javax.naming.NamingException; + +import java.io.*; +import java.net.InetAddress; +import java.text.DateFormat; +import java.text.SimpleDateFormat; +import java.util.*; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +/** + * PingPongProducer is a client that sends test messages, and waits for replies to these messages. The replies may + * either be generated by another client (see {@link PingPongBouncer}, or an extension of it may be used that listens + * to its own messages and does not send replies (see {@link org.apache.qpid.ping.PingClient}). The intention of ping + * pong producer is that it is a swiss-army knife test client that makes almost every aspect of its behaviour + * configurable. + * + *

The pings are sent with a reply-to field set to a single temporary queue, which is the same for all pings. This + * means that this class has to do some work to correlate pings with pongs; it expectes the original message correlation + * id in the ping to be bounced back in the reply correlation id. + * + *

This ping tool accepts a vast number of configuration options, all of which are passed in to the constructor. It + * can ping topics or queues; ping multiple destinations; do persistent pings; send messages of any size; do pings within + * transactions; control the number of pings to send in each transaction; limit its sending rate; and perform failover + * testing. A complete list of accepted parameters, default values and comments on their usage is provided here: + * + *

+ *
Parameters
Parameter Default Comments + *
messageSize 0 Message size in bytes. Not including any headers. + *
destinationName ping The root name to use to generate destination names to ping. + *
persistent false Determines whether peristent delivery is used. + *
transacted false Determines whether messages are sent/received in transactions. + *
broker tcp://localhost:5672 Determines the broker to connect to. + *
virtualHost test Determines the virtual host to send all ping over. + *
rate 0 The maximum rate (in hertz) to send messages at. 0 means no limit. + *
verbose false The verbose flag for debugging. Prints to console on every message. + *
pubsub false Whether to ping topics or queues. Uses p2p by default. + *
failAfterCommit false Whether to prompt user to kill broker after a commit batch. + *
failBeforeCommit false Whether to prompt user to kill broker before a commit batch. + *
failAfterSend false Whether to prompt user to kill broker after a send. + *
failBeforeSend false Whether to prompt user to kill broker before a send. + *
failOnce true Whether to prompt for failover only once. + *
username guest The username to access the broker with. + *
password guest The password to access the broker with. + *
selector null Not used. Defines a message selector to filter pings with. + *
destinationCount 1 The number of destinations to send pings to. + *
numConsumers 1 The number of consumers on each destination. + *
timeout 30000 In milliseconds. The timeout to stop waiting for replies. + *
commitBatchSize 1 The number of messages per transaction in transactional mode. + *
uniqueDests true Whether each receivers only listens to one ping destination or all. + *
durableDests false Whether or not durable destinations are used. + *
ackMode AUTO_ACK The message acknowledgement mode. Possible values are: + * 0 - SESSION_TRANSACTED + * 1 - AUTO_ACKNOWLEDGE + * 2 - CLIENT_ACKNOWLEDGE + * 3 - DUPS_OK_ACKNOWLEDGE + * 257 - NO_ACKNOWLEDGE + * 258 - PRE_ACKNOWLEDGE + *
consTransacted false Whether or not consumers use transactions. Defaults to the same value + * as the 'transacted' option if not seperately defined. + *
consAckMode AUTO_ACK The message acknowledgement mode for consumers. Defaults to the same + * value as 'ackMode' if not seperately defined. + *
maxPending 0 The maximum size in bytes, of messages sent but not yet received. + * Limits the volume of messages currently buffered on the client + * or broker. Can help scale test clients by limiting amount of buffered + * data to avoid out of memory errors. + *
+ * + *

This implements the Runnable interface with a run method that implements an infinite ping loop. The ping loop + * does all its work through helper methods, so that code wishing to run a ping-pong cycle is not forced to do so by + * starting a new thread. The command line invocation does take advantage of this ping loop. A shutdown hook is also + * registered to terminate the ping-pong loop cleanly. + * + *

+ *
CRC Card
Responsibilities Collaborations + *
Provide a ping and wait for all responses cycle. + *
Provide command line invocation to loop the ping cycle on a configurable broker url. + *
+ * + * @todo Use read/write lock in the onmessage, not for reading writing but to make use of a shared and exlcusive lock pair. + * Obtain read lock on all messages, before decrementing the message count. At the end of the on message method add a + * block that obtains the write lock for the very last message, releases any waiting producer. Means that the last + * message waits until all other messages have been handled before releasing producers but allows messages to be + * processed concurrently, unlike the current synchronized block. + */ +public class PingPongProducer implements Runnable, ExceptionListener +{ + /** Used for debugging. */ + private static final Logger log = Logger.getLogger(PingPongProducer.class); + + /** Holds the name of the property to determine whether of not client id is overridden at connection time. */ + public static final String OVERRIDE_CLIENT_ID_PROPNAME = "overrideClientId"; + + /** Holds the default value of the override client id flag. */ + public static final String OVERRIDE_CLIENT_ID_DEAFULT = "false"; + + /** Holds the name of the property to define the JNDI factory name with. */ + public static final String FACTORY_NAME_PROPNAME = "factoryName"; + + /** Holds the default JNDI name of the connection factory. */ + public static final String FACTORY_NAME_DEAFULT = "local"; + + /** Holds the name of the property to set the JNDI initial context properties with. */ + public static final String FILE_PROPERTIES_PROPNAME = "properties"; + + /** Holds the default file name of the JNDI initial context properties. */ + public static final String FILE_PROPERTIES_DEAFULT = "perftests.properties"; + + /** Holds the name of the property to get the test message size from. */ + public static final String MESSAGE_SIZE_PROPNAME = "messageSize"; + + /** Used to set up a default message size. */ + public static final int MESSAGE_SIZE_DEAFULT = 0; + + /** Holds the name of the property to get the ping queue name from. */ + public static final String PING_QUEUE_NAME_PROPNAME = "destinationName"; + + /** Holds the name of the default destination to send pings on. */ + public static final String PING_QUEUE_NAME_DEFAULT = "ping"; + + /** Holds the name of the property to get the queue name postfix from. */ + public static final String QUEUE_NAME_POSTFIX_PROPNAME = "queueNamePostfix"; + + /** Holds the default queue name postfix value. */ + public static final String QUEUE_NAME_POSTFIX_DEFAULT = ""; + + /** Holds the name of the property to get the test delivery mode from. */ + public static final String PERSISTENT_MODE_PROPNAME = "persistent"; + + /** Holds the message delivery mode to use for the test. */ + public static final boolean PERSISTENT_MODE_DEFAULT = false; + + /** Holds the name of the property to get the test transactional mode from. */ + public static final String TRANSACTED_PROPNAME = "transacted"; + + /** Holds the transactional mode to use for the test. */ + public static final boolean TRANSACTED_DEFAULT = false; + + /** Holds the name of the property to get the test consumer transacted mode from. */ + public static final String CONSUMER_TRANSACTED_PROPNAME = "consTransacted"; + + /** Holds the consumer transactional mode default setting. */ + public static final boolean CONSUMER_TRANSACTED_DEFAULT = false; + + /** Holds the name of the property to get the test broker url from. */ + public static final String BROKER_PROPNAME = "broker"; + + /** Holds the default broker url for the test. */ + public static final String BROKER_DEFAULT = "tcp://localhost:5672"; + + /** Holds the name of the property to get the test broker virtual path. */ + public static final String VIRTUAL_HOST_PROPNAME = "virtualHost"; + + /** Holds the default virtual path for the test. */ + public static final String VIRTUAL_HOST_DEFAULT = ""; + + /** Holds the name of the property to get the message rate from. */ + public static final String RATE_PROPNAME = "rate"; + + /** Defines the default rate (in pings per second) to send pings at. 0 means as fast as possible, no restriction. */ + public static final int RATE_DEFAULT = 0; + + /** Holds the name of the property to get the verbose mode proeprty from. */ + public static final String VERBOSE_PROPNAME = "verbose"; + + /** Holds the default verbose mode. */ + public static final boolean VERBOSE_DEFAULT = false; + + /** Holds the name of the property to get the p2p or pub/sub messaging mode from. */ + public static final String PUBSUB_PROPNAME = "pubsub"; + + /** Holds the pub/sub mode default, true means ping a topic, false means ping a queue. */ + public static final boolean PUBSUB_DEFAULT = false; + + /** Holds the name of the property to get the fail after commit flag from. */ + public static final String FAIL_AFTER_COMMIT_PROPNAME = "failAfterCommit"; + + /** Holds the default failover after commit test flag. */ + public static final boolean FAIL_AFTER_COMMIT_DEFAULT = false; + + /** Holds the name of the proeprty to get the fail before commit flag from. */ + public static final String FAIL_BEFORE_COMMIT_PROPNAME = "failBeforeCommit"; + + /** Holds the default failover before commit test flag. */ + public static final boolean FAIL_BEFORE_COMMIT_DEFAULT = false; + + /** Holds the name of the proeprty to get the fail after send flag from. */ + public static final String FAIL_AFTER_SEND_PROPNAME = "failAfterSend"; + + /** Holds the default failover after send test flag. */ + public static final boolean FAIL_AFTER_SEND_DEFAULT = false; + + /** Holds the name of the property to get the fail before send flag from. */ + public static final String FAIL_BEFORE_SEND_PROPNAME = "failBeforeSend"; + + /** Holds the default failover before send test flag. */ + public static final boolean FAIL_BEFORE_SEND_DEFAULT = false; + + /** Holds the name of the property to get the fail once flag from. */ + public static final String FAIL_ONCE_PROPNAME = "failOnce"; + + /** The default failover once flag, true means only do one failover, false means failover on every commit cycle. */ + public static final boolean FAIL_ONCE_DEFAULT = true; + + /** Holds the name of the property to get the broker access username from. */ + public static final String USERNAME_PROPNAME = "username"; + + /** Holds the default broker log on username. */ + public static final String USERNAME_DEFAULT = "guest"; + + /** Holds the name of the property to get the broker access password from. */ + public static final String PASSWORD_PROPNAME = "password"; + + /** Holds the default broker log on password. */ + public static final String PASSWORD_DEFAULT = "guest"; + + /** Holds the name of the proeprty to get the. */ + public static final String SELECTOR_PROPNAME = "selector"; + + /** Holds the default message selector. */ + public static final String SELECTOR_DEFAULT = ""; + + /** Holds the name of the property to get the destination count from. */ + public static final String DESTINATION_COUNT_PROPNAME = "destinationCount"; + + /** Defines the default number of destinations to ping. */ + public static final int DESTINATION_COUNT_DEFAULT = 1; + + /** Holds the name of the property to get the number of consumers per destination from. */ + public static final String NUM_CONSUMERS_PROPNAME = "numConsumers"; + + /** Defines the default number consumers per destination. */ + public static final int NUM_CONSUMERS_DEFAULT = 1; + + /** Holds the name of the property to get the waiting timeout for response messages. */ + public static final String TIMEOUT_PROPNAME = "timeout"; + + /** Default time to wait before assuming that a ping has timed out. */ + public static final long TIMEOUT_DEFAULT = 30000; + + /** Holds the name of the property to get the commit batch size from. */ + public static final String TX_BATCH_SIZE_PROPNAME = "commitBatchSize"; + + /** Defines the default number of pings to send in each transaction when running transactionally. */ + public static final int TX_BATCH_SIZE_DEFAULT = 1; + + /** Holds the name of the property to get the unique destinations flag from. */ + public static final String UNIQUE_DESTS_PROPNAME = "uniqueDests"; + + /** Defines the default value for the unique destinations property. */ + public static final boolean UNIQUE_DESTS_DEFAULT = true; + + /** Holds the name of the property to get the durable destinations flag from. */ + public static final String DURABLE_DESTS_PROPNAME = "durableDests"; + + /** Defines the default value of the durable destinations flag. */ + public static final boolean DURABLE_DESTS_DEFAULT = false; + + /** Holds the name of the proeprty to get the message acknowledgement mode from. */ + public static final String ACK_MODE_PROPNAME = "ackMode"; + + /** Defines the default message acknowledgement mode. */ + public static final int ACK_MODE_DEFAULT = Session.AUTO_ACKNOWLEDGE; + + /** Holds the name of the property to get the consumers message acknowledgement mode from. */ + public static final String CONSUMER_ACK_MODE_PROPNAME = "consAckMode"; + + /** Defines the default consumers message acknowledgement mode. */ + public static final int CONSUMER_ACK_MODE_DEFAULT = Session.AUTO_ACKNOWLEDGE; + + /** Holds the name of the property to get the maximum pending message size setting from. */ + public static final String MAX_PENDING_PROPNAME = "maxPending"; + + /** Defines the default value for the maximum pending message size setting. 0 means no limit. */ + public static final int MAX_PENDING_DEFAULT = 0; + + /** Defines the default prefetch size to use when consuming messages. */ + public static final int PREFETCH_DEFAULT = 100; + + /** Defines the default value of the no local flag to use when consuming messages. */ + public static final boolean NO_LOCAL_DEFAULT = false; + + /** Defines the default value of the exclusive flag to use when consuming messages. */ + public static final boolean EXCLUSIVE_DEFAULT = false; + + /** Holds the name of the property to store nanosecond timestamps in ping messages with. */ + public static final String MESSAGE_TIMESTAMP_PROPNAME = "timestamp"; + + /** Holds the default configuration properties. */ + public static ParsedProperties defaults = new ParsedProperties(); + + static + { + defaults.setPropertyIfNull(OVERRIDE_CLIENT_ID_PROPNAME, OVERRIDE_CLIENT_ID_DEAFULT); + defaults.setPropertyIfNull(FILE_PROPERTIES_PROPNAME, FILE_PROPERTIES_DEAFULT); + defaults.setPropertyIfNull(FACTORY_NAME_PROPNAME, FACTORY_NAME_DEAFULT); + defaults.setPropertyIfNull(BROKER_PROPNAME, BROKER_DEFAULT); + defaults.setPropertyIfNull(USERNAME_PROPNAME, USERNAME_DEFAULT); + defaults.setPropertyIfNull(PASSWORD_PROPNAME, PASSWORD_DEFAULT); + defaults.setPropertyIfNull(VIRTUAL_HOST_PROPNAME, VIRTUAL_HOST_DEFAULT); + defaults.setPropertyIfNull(PING_QUEUE_NAME_PROPNAME, PING_QUEUE_NAME_DEFAULT); + defaults.setPropertyIfNull(QUEUE_NAME_POSTFIX_PROPNAME, QUEUE_NAME_POSTFIX_DEFAULT); + defaults.setPropertyIfNull(SELECTOR_PROPNAME, SELECTOR_DEFAULT); + defaults.setPropertyIfNull(TRANSACTED_PROPNAME, TRANSACTED_DEFAULT); + defaults.setPropertyIfNull(CONSUMER_TRANSACTED_PROPNAME, CONSUMER_TRANSACTED_DEFAULT); + defaults.setPropertyIfNull(PERSISTENT_MODE_PROPNAME, PERSISTENT_MODE_DEFAULT); + defaults.setPropertyIfNull(ACK_MODE_PROPNAME, ACK_MODE_DEFAULT); + defaults.setPropertyIfNull(CONSUMER_ACK_MODE_PROPNAME, CONSUMER_ACK_MODE_DEFAULT); + defaults.setPropertyIfNull(MESSAGE_SIZE_PROPNAME, MESSAGE_SIZE_DEAFULT); + defaults.setPropertyIfNull(VERBOSE_PROPNAME, VERBOSE_DEFAULT); + defaults.setPropertyIfNull(PUBSUB_PROPNAME, PUBSUB_DEFAULT); + defaults.setPropertyIfNull(UNIQUE_DESTS_PROPNAME, UNIQUE_DESTS_DEFAULT); + defaults.setPropertyIfNull(DURABLE_DESTS_PROPNAME, DURABLE_DESTS_DEFAULT); + defaults.setPropertyIfNull(FAIL_BEFORE_COMMIT_PROPNAME, FAIL_BEFORE_COMMIT_DEFAULT); + defaults.setPropertyIfNull(FAIL_AFTER_COMMIT_PROPNAME, FAIL_AFTER_COMMIT_DEFAULT); + defaults.setPropertyIfNull(FAIL_BEFORE_SEND_PROPNAME, FAIL_BEFORE_SEND_DEFAULT); + defaults.setPropertyIfNull(FAIL_AFTER_SEND_PROPNAME, FAIL_AFTER_SEND_DEFAULT); + defaults.setPropertyIfNull(FAIL_ONCE_PROPNAME, FAIL_ONCE_DEFAULT); + defaults.setPropertyIfNull(TX_BATCH_SIZE_PROPNAME, TX_BATCH_SIZE_DEFAULT); + defaults.setPropertyIfNull(DESTINATION_COUNT_PROPNAME, DESTINATION_COUNT_DEFAULT); + defaults.setPropertyIfNull(NUM_CONSUMERS_PROPNAME, NUM_CONSUMERS_DEFAULT); + defaults.setPropertyIfNull(RATE_PROPNAME, RATE_DEFAULT); + defaults.setPropertyIfNull(TIMEOUT_PROPNAME, TIMEOUT_DEFAULT); + defaults.setPropertyIfNull(MAX_PENDING_PROPNAME, MAX_PENDING_DEFAULT); + } + + /** Allows setting of client ID on the connection, rather than through the connection URL. */ + protected boolean _overrideClientId; + + /** Holds the JNDI name of the JMS connection factory. */ + protected String _factoryName; + + /** Holds the name of the properties file to configure JNDI with. */ + protected String _fileProperties; + + /** Holds the broker url. */ + protected String _brokerDetails; + + /** Holds the username to access the broker with. */ + protected String _username; + + /** Holds the password to access the broker with. */ + protected String _password; + + /** Holds the virtual host on the broker to run the tests through. */ + protected String _virtualpath; + + /** Holds the root name from which to generate test destination names. */ + protected String _destinationName; + + /** Holds the default queue name postfix value. */ + protected String _queueNamePostfix; + + /** Holds the message selector to filter the pings with. */ + protected String _selector; + + /** Holds the producers transactional mode flag. */ + protected boolean _transacted; + + /** Holds the consumers transactional mode flag. */ + protected boolean _consTransacted; + + /** Determines whether this producer sends persistent messages. */ + protected boolean _persistent; + + /** Holds the acknowledgement mode used for the producers. */ + protected int _ackMode; + + /** Holds the acknowledgement mode setting for the consumers. */ + protected int _consAckMode; + + /** Determines what size of messages this producer sends. */ + protected int _messageSize; + + /** Used to indicate that the ping loop should print out whenever it pings. */ + protected boolean _verbose; + + /** Flag used to indicate if this is a point to point or pub/sub ping client. */ + protected boolean _isPubSub; + + /** Flag used to indicate if the destinations should be unique client. */ + protected boolean _isUnique; + + /** Flag used to indicate that durable destination should be used. */ + protected boolean _isDurable; + + /** Flag used to indicate that the user should be prompted to terminate a broker, to test failover before a commit. */ + protected boolean _failBeforeCommit; + + /** Flag used to indicate that the user should be prompted to terminate a broker, to test failover after a commit. */ + protected boolean _failAfterCommit; + + /** Flag used to indicate that the user should be prompted to terminate a broker, to test failover before a send. */ + protected boolean _failBeforeSend; + + /** Flag used to indicate that the user should be prompted to terminate a broker, to test failover after a send. */ + protected boolean _failAfterSend; + + /** Flag used to indicate that failover prompting should only be done on the first commit, not on every commit. */ + protected boolean _failOnce; + + /** Holds the number of sends that should be performed in every transaction when using transactions. */ + protected int _txBatchSize; + + /** Holds the number of destinations to ping. */ + protected int _noOfDestinations; + + /** Holds the number of consumers per destination. */ + protected int _noOfConsumers; + + /** Holds the maximum send rate in herz. */ + protected int _rate; + + /** + * Holds the size of the maximum amount of pending data that the client should buffer, sending is suspended + * if this limit is breached. + */ + protected int _maxPendingSize; + + /** A source for providing sequential unique correlation ids. These will be unique within the same JVM. */ + private static AtomicLong _correlationIdGenerator = new AtomicLong(0L); + + /** A source for providing sequential unqiue ids for instances of this class to be identifed with. */ + private static AtomicInteger _instanceIdGenerator = new AtomicInteger(0); + + /** Holds this instances unique id. */ + private int instanceId; + + /** + * Holds a map from message ids to latches on which threads wait for replies. This map is shared accross multiple + * ping producers on the same JVM. + */ + private static Map perCorrelationIds = + Collections.synchronizedMap(new HashMap()); + + /** A convenient formatter to use when time stamping output. */ + protected static final DateFormat timestampFormatter = new SimpleDateFormat("hh:mm:ss:SS"); + + /** Holds the connection for the message producer. */ + protected Connection _connection; + + /** Holds the consumer connections. */ + protected Connection[] _consumerConnection; + + /** Holds the controlSession on which ping replies are received. */ + protected Session[] _consumerSession; + + /** Holds the producer controlSession, needed to create ping messages. */ + protected Session _producerSession; + + /** Holds the destination where the response messages will arrive. */ + protected Destination _replyDestination; + + /** Holds the set of destinations that this ping producer pings. */ + protected List _pingDestinations; + + /** Used to restrict the sending rate to a specified limit. */ + protected Throttle _rateLimiter; + + /** Holds a message listener that this message listener chains all its messages to. */ + protected ChainedMessageListener _chainedMessageListener = null; + + /** + * This id generator is used to generate ids to append to the queue name to ensure that queues can be unique when + * creating multiple ping producers in the same JVM. + */ + protected static AtomicInteger _queueJVMSequenceID = new AtomicInteger(); + + /** + * This id generator is used to generates ids that are only unique within this pinger. Creating multiple pingers + * on the same JVM using this id generator will allow them to ping on the same queues. + */ + protected AtomicInteger _queueSharedID = new AtomicInteger(); + + /** Used to tell the ping loop when to terminate, it only runs while this is true. */ + protected boolean _publish = true; + + /** Holds the message producer to send the pings through. */ + protected MessageProducer _producer; + + /** Holds the message consumer to receive the ping replies through. */ + protected MessageConsumer[] _consumer; + + /** The prompt to display when asking the user to kill the broker for failover testing. */ + private static final String KILL_BROKER_PROMPT = "Kill broker now, then press Return."; + + /** Holds the name for this test client to be identified to the broker with. */ + private String _clientID; + + /** Keeps count of the total messages sent purely for debugging purposes. */ + private static AtomicInteger numSent = new AtomicInteger(); + + /** + * Holds a monitor which is used to synchronize sender and receivers threads, where the sender has elected + * to wait until the number of unreceived message is reduced before continuing to send. This monitor is a + * fair SynchronousQueue becuase that provides fair scheduling, to ensure that all producer threads get an + * equal chance to produce messages. + */ + static final SynchronousQueue _sendPauseMonitor = new SynchronousQueue(true); + + /** Keeps a count of the number of message currently sent but not received. */ + static AtomicInteger _unreceived = new AtomicInteger(0); + + /** + * Creates a ping producer with the specified parameters, of which there are many. See the class level comments + * for details. This constructor creates a connection to the broker and creates producer and consumer sessions on + * it, to send and recieve its pings and replies on. + * + * @param overrides Properties containing any desired overrides to the defaults. + * + * @throws Exception Any exceptions are allowed to fall through. + */ + public PingPongProducer(Properties overrides) throws Exception + { + // log.debug("public PingPongProducer(Properties overrides = " + overrides + "): called"); + instanceId = _instanceIdGenerator.getAndIncrement(); + + // Create a set of parsed properties from the defaults overriden by the passed in values. + ParsedProperties properties = new ParsedProperties(defaults); + properties.putAll(overrides); + + // Extract the configuration properties to set the pinger up with. + _overrideClientId = properties.getPropertyAsBoolean(OVERRIDE_CLIENT_ID_PROPNAME); + _factoryName = properties.getProperty(FACTORY_NAME_PROPNAME); + _fileProperties = properties.getProperty(FILE_PROPERTIES_PROPNAME); + _brokerDetails = properties.getProperty(BROKER_PROPNAME); + _username = properties.getProperty(USERNAME_PROPNAME); + _password = properties.getProperty(PASSWORD_PROPNAME); + _virtualpath = properties.getProperty(VIRTUAL_HOST_PROPNAME); + _destinationName = properties.getProperty(PING_QUEUE_NAME_PROPNAME); + _queueNamePostfix = properties.getProperty(QUEUE_NAME_POSTFIX_PROPNAME); + _selector = properties.getProperty(SELECTOR_PROPNAME); + _transacted = properties.getPropertyAsBoolean(TRANSACTED_PROPNAME); + _consTransacted = properties.getPropertyAsBoolean(CONSUMER_TRANSACTED_PROPNAME); + _persistent = properties.getPropertyAsBoolean(PERSISTENT_MODE_PROPNAME); + _messageSize = properties.getPropertyAsInteger(MESSAGE_SIZE_PROPNAME); + _verbose = properties.getPropertyAsBoolean(VERBOSE_PROPNAME); + _failAfterCommit = properties.getPropertyAsBoolean(FAIL_AFTER_COMMIT_PROPNAME); + _failBeforeCommit = properties.getPropertyAsBoolean(FAIL_BEFORE_COMMIT_PROPNAME); + _failAfterSend = properties.getPropertyAsBoolean(FAIL_AFTER_SEND_PROPNAME); + _failBeforeSend = properties.getPropertyAsBoolean(FAIL_BEFORE_SEND_PROPNAME); + _failOnce = properties.getPropertyAsBoolean(FAIL_ONCE_PROPNAME); + _txBatchSize = properties.getPropertyAsInteger(TX_BATCH_SIZE_PROPNAME); + _noOfDestinations = properties.getPropertyAsInteger(DESTINATION_COUNT_PROPNAME); + _noOfConsumers = properties.getPropertyAsInteger(NUM_CONSUMERS_PROPNAME); + _rate = properties.getPropertyAsInteger(RATE_PROPNAME); + _isPubSub = properties.getPropertyAsBoolean(PUBSUB_PROPNAME); + _isUnique = properties.getPropertyAsBoolean(UNIQUE_DESTS_PROPNAME); + _isDurable = properties.getPropertyAsBoolean(DURABLE_DESTS_PROPNAME); + _ackMode = _transacted ? 0 : properties.getPropertyAsInteger(ACK_MODE_PROPNAME); + _consAckMode = _consTransacted ? 0 : properties.getPropertyAsInteger(CONSUMER_ACK_MODE_PROPNAME); + _maxPendingSize = properties.getPropertyAsInteger(MAX_PENDING_PROPNAME); + + // Check that one or more destinations were specified. + if (_noOfDestinations < 1) + { + throw new IllegalArgumentException("There must be at least one destination."); + } + + // Set up a throttle to control the send rate, if a rate > 0 is specified. + if (_rate > 0) + { + _rateLimiter = new BatchedThrottle(); + _rateLimiter.setRate(_rate); + } + + // Create the connection and message producers/consumers. + // establishConnection(true, true); + } + + /** + * Establishes a connection to the broker and creates message consumers and producers based on the parameters + * that this ping client was created with. + * + * @param producer Flag to indicate whether or not the producer should be set up. + * @param consumer Flag to indicate whether or not the consumers should be set up. + * + * @throws Exception Any exceptions are allowed to fall through. + */ + public void establishConnection(boolean producer, boolean consumer) throws Exception + { + // log.debug("public void establishConnection(): called"); + + // Generate a unique identifying name for this client, based on it ip address and the current time. + InetAddress address = InetAddress.getLocalHost(); + // _clientID = address.getHostName() + System.currentTimeMillis(); + _clientID = "perftest_" + instanceId; + + // Create a connection to the broker. + createConnection(_clientID); + + // Create transactional or non-transactional sessions, based on the command line arguments. + _producerSession = _connection.createSession(_transacted, _ackMode); + + _consumerSession = new Session[_noOfConsumers]; + + for (int i = 0; i < _noOfConsumers; i++) + { + _consumerSession[i] = _consumerConnection[i].createSession(_consTransacted, _consAckMode); + } + + // Create the destinations to send pings to and receive replies from. + _replyDestination = _consumerSession[0].createTemporaryQueue(); + createPingDestinations(_noOfDestinations, _selector, _destinationName, _isUnique, _isDurable); + + // Create the message producer only if instructed to. + if (producer) + { + createProducer(); + } + + // Create the message consumer only if instructed to. + if (consumer) + { + createReplyConsumers(getReplyDestinations(), _selector); + } + } + + /** + * Establishes a connection to the broker, based on the configuration parameters that this ping client was + * created with. + * + * @param clientID The clients identifier. + * + * @throws JMSException Underlying exceptions allowed to fall through. + * @throws NamingException Underlying exceptions allowed to fall through. + * @throws IOException Underlying exceptions allowed to fall through. + */ + protected void createConnection(String clientID) throws JMSException, NamingException, IOException + { + // _log.debug("protected void createConnection(String clientID = " + clientID + "): called"); + + // _log.debug("Creating a connection for the message producer."); + File propsFile = new File(_fileProperties); + InputStream is = new FileInputStream(propsFile); + Properties properties = new Properties(); + properties.load(is); + + Context context = new InitialContext(properties); + ConnectionFactory factory = (ConnectionFactory) context.lookup(_factoryName); + _connection = factory.createConnection(_username, _password); + + if (_overrideClientId) + { + _connection.setClientID(clientID); + } + + // _log.debug("Creating " + _noOfConsumers + " connections for the consumers."); + + _consumerConnection = new Connection[_noOfConsumers]; + + for (int i = 0; i < _noOfConsumers; i++) + { + _consumerConnection[i] = factory.createConnection(_username, _password); + // _consumerConnection[i].setClientID(clientID); + } + } + + /** + * Starts a ping-pong loop running from the command line. The bounce back client {@link PingPongBouncer} also needs + * to be started to bounce the pings back again. + * + * @param args The command line arguments. + */ + public static void main(String[] args) + { + try + { + Properties options = + CommandLineParser.processCommandLine(args, new CommandLineParser(new String[][] {}), System.getProperties()); + + // Create a ping producer overriding its defaults with all options passed on the command line. + PingPongProducer pingProducer = new PingPongProducer(options); + pingProducer.establishConnection(true, true); + + // Start the ping producers dispatch thread running. + pingProducer._connection.start(); + + // Create a shutdown hook to terminate the ping-pong producer. + Runtime.getRuntime().addShutdownHook(pingProducer.getShutdownHook()); + + // Ensure that the ping pong producer is registered to listen for exceptions on the connection too. + pingProducer._connection.setExceptionListener(pingProducer); + + // Create the ping loop thread and run it until it is terminated by the shutdown hook or exception. + Thread pingThread = new Thread(pingProducer); + pingThread.run(); + pingThread.join(); + } + catch (Exception e) + { + System.err.println(e.getMessage()); + log.error("Top level handler caught execption.", e); + System.exit(1); + } + } + + /** + * Convenience method for a short pause. + * + * @param sleepTime The time in milliseconds to pause for. + */ + public static void pause(long sleepTime) + { + if (sleepTime > 0) + { + try + { + Thread.sleep(sleepTime); + } + catch (InterruptedException ie) + { } + } + } + + /** + * Gets all the reply destinations (to listen for replies on). In this case this will just be the single reply to + * destination of this pinger. + * + * @return The single reply to destination of this pinger, wrapped in a list. + */ + public List getReplyDestinations() + { + // log.debug("public List getReplyDestinations(): called"); + + List replyDestinations = new ArrayList(); + replyDestinations.add(_replyDestination); + + // log.debug("replyDestinations = " + replyDestinations); + + return replyDestinations; + } + + /** + * Creates the producer to send the pings on. This is created without a default destination. Its persistent delivery + * flag is set accoring the ping producer creation options. + * + * @throws JMSException Any JMSExceptions are allowed to fall through. + */ + public void createProducer() throws JMSException + { + // log.debug("public void createProducer(): called"); + + _producer = (MessageProducer) _producerSession.createProducer(null); + _producer.setDeliveryMode(_persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT); + + // log.debug("Created producer for " + (_persistent ? "persistent" : "non-persistent") + " messages."); + } + + /** + * Creates consumers for the specified number of destinations. The destinations themselves are also created by this + * method. + * + * @param noOfDestinations The number of destinations to create consumers for. + * @param selector The message selector to filter the consumers with. + * @param rootName The root of the name, or actual name if only one is being created. + * @param unique true to make the destinations unique to this pinger, false to share the + * numbering with all pingers on the same JVM. + * @param durable If the destinations are durable topics. + * + * @throws JMSException Any JMSExceptions are allowed to fall through. + */ + public void createPingDestinations(int noOfDestinations, String selector, String rootName, boolean unique, + boolean durable) throws JMSException + { + /*log.debug("public void createPingDestinations(int noOfDestinations = " + noOfDestinations + ", String selector = " + + selector + ", String rootName = " + rootName + ", boolean unique = " + unique + ", boolean durable = " + + durable + "): called");*/ + + _pingDestinations = new ArrayList(); + + // Create the desired number of ping destinations and consumers for them. + // log.debug("Creating " + noOfDestinations + " destinations to ping."); + + for (int i = 0; i < noOfDestinations; i++) + { + Destination destination; + String id; + + // Generate an id, unique within this pinger or to the whole JVM depending on the unique flag. + if (unique) + { + // log.debug("Creating unique destinations."); + id = "_" + _queueJVMSequenceID.incrementAndGet() + "_" + _connection.getClientID(); + } + else + { + // log.debug("Creating shared destinations."); + id = "_" + _queueSharedID.incrementAndGet(); + } + + // Check if this is a pub/sub pinger, in which case create topics. + if (_isPubSub) + { + destination = _producerSession.createTopic(rootName + id); + // log.debug("Created non-durable topic " + destination); + + if (durable) + { + _producerSession.createDurableSubscriber((Topic) destination, _connection.getClientID()); + } + } + // Otherwise this is a p2p pinger, in which case create queues. + else + { + destination = _producerSession.createQueue(rootName + id + _queueNamePostfix); + // log.debug("Created queue " + destination); + } + + // Keep the destination. + _pingDestinations.add(destination); + } + } + + /** + * Creates consumers for the specified destinations and registers this pinger to listen to their messages. + * + * @param destinations The destinations to listen to. + * @param selector A selector to filter the messages with. + * + * @throws javax.jms.JMSException Any JMSExceptions are allowed to fall through. + */ + public void createReplyConsumers(Collection destinations, String selector) throws JMSException + { + /*log.debug("public void createReplyConsumers(Collection destinations = " + destinations + + ", String selector = " + selector + "): called");*/ + + log.debug("There are " + destinations.size() + " destinations."); + log.debug("Creating " + _noOfConsumers + " consumers on each destination."); + log.debug("Total number of consumers is: " + (destinations.size() * _noOfConsumers)); + + for (Destination destination : destinations) + { + _consumer = new MessageConsumer[_noOfConsumers]; + + for (int i = 0; i < _noOfConsumers; i++) + { + // Create a consumer for the destination and set this pinger to listen to its messages. + _consumer[i] = _consumerSession[i].createConsumer(destination, selector, NO_LOCAL_DEFAULT); + + final int consumerNo = i; + + _consumer[i].setMessageListener(new MessageListener() + { + public void onMessage(Message message) + { + onMessageWithConsumerNo(message, consumerNo); + } + }); + + log.debug("Set consumer " + i + " to listen to replies sent to destination: " + destination); + } + } + } + + /** + * Stores the received message in the replies map, then resets the boolean latch that a thread waiting for a + * correlating reply may be waiting on. This is only done if the reply has a correlation id that is expected in the + * replies map. + * + * @param message The received message. + * @param consumerNo The consumer number within this test pinger instance. + */ + public void onMessageWithConsumerNo(Message message, int consumerNo) + { + // log.debug("public void onMessageWithConsumerNo(Message message, int consumerNo = " + consumerNo + "): called"); + try + { + long now = System.nanoTime(); + long timestamp = getTimestamp(message); + long pingTime = now - timestamp; + + // NDC.push("id" + instanceId + "/cons" + consumerNo); + + // Extract the messages correlation id. + String correlationID = message.getJMSCorrelationID(); + // log.debug("correlationID = " + correlationID); + + // int num = message.getIntProperty("MSG_NUM"); + // log.info("Message " + num + " received."); + + boolean isRedelivered = message.getJMSRedelivered(); + // log.debug("isRedelivered = " + isRedelivered); + + if (!isRedelivered) + { + // Countdown on the traffic light if there is one for the matching correlation id. + PerCorrelationId perCorrelationId = perCorrelationIds.get(correlationID); + + if (perCorrelationId != null) + { + CountDownLatch trafficLight = perCorrelationId.trafficLight; + + // Restart the timeout timer on every message. + perCorrelationId.timeOutStart = System.nanoTime(); + + // log.debug("Reply was expected, decrementing the latch for the id, " + correlationID); + + // Release waiting senders if there are some and using maxPending limit. + if ((_maxPendingSize > 0)) + { + // Decrement the count of sent but not yet received messages. + int unreceived = _unreceived.decrementAndGet(); + int unreceivedSize = + (unreceived * ((_messageSize == 0) ? 1 : _messageSize)) + / (_isPubSub ? getConsumersPerDestination() : 1); + + // log.debug("unreceived = " + unreceived); + // log.debug("unreceivedSize = " + unreceivedSize); + + // synchronized (_sendPauseMonitor) + // { + if (unreceivedSize < _maxPendingSize) + { + _sendPauseMonitor.poll(); + } + // } + } + + // Decrement the countdown latch. Before this point, it is possible that two threads might enter this + // method simultanesouly with the same correlation id. Decrementing the latch in a synchronized block + // ensures that each thread will get a unique value for the remaining messages. + long trueCount; + long remainingCount; + + synchronized (trafficLight) + { + trafficLight.countDown(); + + trueCount = trafficLight.getCount(); + remainingCount = trueCount - 1; + + // NDC.push("/rem" + remainingCount); + + // log.debug("remainingCount = " + remainingCount); + // log.debug("trueCount = " + trueCount); + + // Commit on transaction batch size boundaries. At this point in time the waiting producer + // remains blocked, even on the last message. + // Commit count is divided by noOfConsumers in p2p mode, so that each consumer only commits on + // each batch boundary. For pub/sub each consumer gets every message so no division is done. + // When running in client ack mode, an ack is done instead of a commit, on the commit batch + // size boundaries. + long commitCount = _isPubSub ? remainingCount : (remainingCount / _noOfConsumers); + // log.debug("commitCount = " + commitCount); + + if ((commitCount % _txBatchSize) == 0) + { + if (_consAckMode == 2) + { + // log.debug("Doing client ack for consumer " + consumerNo + "."); + message.acknowledge(); + } + else + { + // log.debug("Trying commit for consumer " + consumerNo + "."); + commitTx(_consumerSession[consumerNo]); + // log.info("Tx committed on consumer " + consumerNo); + } + } + + // Forward the message and remaining count to any interested chained message listener. + if (_chainedMessageListener != null) + { + _chainedMessageListener.onMessage(message, (int) remainingCount, pingTime); + } + + // Check if this is the last message, in which case release any waiting producers. This is done + // after the transaction has been committed and any listeners notified. + if (trueCount == 1) + { + trafficLight.countDown(); + } + } + } + else + { + log.warn("Got unexpected message with correlationId: " + correlationID); + } + } + else + { + log.warn("Got redelivered message, ignoring."); + } + } + catch (JMSException e) + { + log.warn("There was a JMSException: " + e.getMessage(), e); + } + finally + { + // log.debug("public void onMessageWithConsumerNo(Message message, int consumerNo): ending"); + // NDC.clear(); + } + } + + /** + * Sends the specified number of ping message and then waits for all correlating replies. If the wait times out + * before a reply arrives, then a null reply is returned from this method. This method allows the caller to specify + * the correlation id. + * + * @param message The message to send. If this is null, one is generated. + * @param numPings The number of ping messages to send. + * @param timeout The timeout in milliseconds. + * @param messageCorrelationId The message correlation id. If this is null, one is generated. + * + * @return The number of replies received. This may be less than the number sent if the timeout terminated the wait + * for all prematurely. + * + * @throws JMSException All underlying JMSExceptions are allowed to fall through. + * @throws InterruptedException When interrupted by a timeout + */ + public int pingAndWaitForReply(Message message, int numPings, long timeout, String messageCorrelationId) + throws JMSException, InterruptedException + { + /*log.debug("public int pingAndWaitForReply(Message message, int numPings = " + numPings + ", long timeout = " + + timeout + ", String messageCorrelationId = " + messageCorrelationId + "): called");*/ + + // Generate a unique correlation id to put on the messages before sending them, if one was not specified. + if (messageCorrelationId == null) + { + messageCorrelationId = Long.toString(_correlationIdGenerator.incrementAndGet()); + } + + try + { + // NDC.push("prod"); + + // Create a count down latch to count the number of replies with. This is created before the messages are + // sent so that the replies cannot be received before the count down is created. + // One is added to this, so that the last reply becomes a special case. The special case is that the + // chained message listener must be called before this sender can be unblocked, but that decrementing the + // countdown needs to be done before the chained listener can be called. + PerCorrelationId perCorrelationId = new PerCorrelationId(); + + perCorrelationId.trafficLight = new CountDownLatch(getExpectedNumPings(numPings) + 1); + perCorrelationIds.put(messageCorrelationId, perCorrelationId); + + // Set up the current time as the start time for pinging on the correlation id. This is used to determine + // timeouts. + perCorrelationId.timeOutStart = System.nanoTime(); + + // Send the specifed number of messages. + pingNoWaitForReply(message, numPings, messageCorrelationId); + + boolean timedOut; + boolean allMessagesReceived; + int numReplies; + + do + { + // Block the current thread until replies to all the messages are received, or it times out. + perCorrelationId.trafficLight.await(timeout, TimeUnit.MILLISECONDS); + + // Work out how many replies were receieved. + numReplies = getExpectedNumPings(numPings) - (int) perCorrelationId.trafficLight.getCount(); + + allMessagesReceived = numReplies == getExpectedNumPings(numPings); + + // log.debug("numReplies = " + numReplies); + // log.debug("allMessagesReceived = " + allMessagesReceived); + + // Recheck the timeout condition. + long now = System.nanoTime(); + long lastMessageReceievedAt = perCorrelationId.timeOutStart; + timedOut = (now - lastMessageReceievedAt) > (timeout * 1000000); + + // log.debug("now = " + now); + // log.debug("lastMessageReceievedAt = " + lastMessageReceievedAt); + } + while (!timedOut && !allMessagesReceived); + + if ((numReplies < getExpectedNumPings(numPings)) && _verbose) + { + log.info("Timed out (" + timeout + " ms) before all replies received on id, " + messageCorrelationId); + } + else if (_verbose) + { + log.info("Got all replies on id, " + messageCorrelationId); + } + + // commitTx(_consumerSession); + + // log.debug("public int pingAndWaitForReply(Message message, int numPings, long timeout): ending"); + + return numReplies; + } + // Ensure that the message countdown latch is always removed from the reply map. The reply map is long lived, + // so will be a memory leak if this is not done. + finally + { + // NDC.pop(); + perCorrelationIds.remove(messageCorrelationId); + } + } + + /** + * Sends the specified number of ping messages and does not wait for correlating replies. + * + * @param message The message to send. + * @param numPings The number of pings to send. + * @param messageCorrelationId A correlation id to place on all messages sent. + * + * @throws JMSException All underlying JMSExceptions are allowed to fall through. + */ + public void pingNoWaitForReply(Message message, int numPings, String messageCorrelationId) throws JMSException + { + /*log.debug("public void pingNoWaitForReply(Message message, int numPings = " + numPings + + ", String messageCorrelationId = " + messageCorrelationId + "): called");*/ + + if (message == null) + { + message = getTestMessage(getReplyDestinations().get(0), _messageSize, _persistent); + } + + message.setJMSCorrelationID(messageCorrelationId); + + // Set up a committed flag to detect uncommitted messages at the end of the send loop. This may occurr if the + // transaction batch size is not a factor of the number of pings. In which case an extra commit at the end is + // needed. + boolean committed = false; + + // Send all of the ping messages. + for (int i = 0; i < numPings; i++) + { + // Re-timestamp the message. + // message.setLongProperty(MESSAGE_TIMESTAMP_PROPNAME, System.nanoTime()); + + // Send the message, passing in the message count. + committed = sendMessage(i, message); + + // Spew out per message timings on every message sonly in verbose mode. + /*if (_verbose) + { + log.info(timestampFormatter.format(new Date()) + ": Pinged at with correlation id, " + messageCorrelationId); + }*/ + } + + // Call commit if the send loop finished before reaching a batch size boundary so there may still be uncommitted messages. + if (!committed) + { + commitTx(_producerSession); + } + } + + /** + * Sends the sepcified message, applies rate limiting and possibly commits the current transaction. The count of + * messages sent so far must be specified and is used to round robin the ping destinations (where there are more + * than one), and to determine if the transaction batch size has been reached and the sent messages should be + * committed. + * + * @param i The count of messages sent so far in a loop of multiple calls to this send method. + * @param message The message to send. + * + * @return true if the messages were committed, false otherwise. + * + * @throws JMSException All underlyiung JMSExceptions are allowed to fall through. + */ + protected boolean sendMessage(int i, Message message) throws JMSException + { + try + { + NDC.push("id" + instanceId + "/prod"); + + // log.debug("protected boolean sendMessage(int i = " + i + ", Message message): called"); + // log.debug("_txBatchSize = " + _txBatchSize); + + // Round robin the destinations as the messages are sent. + Destination destination = _pingDestinations.get(i % _pingDestinations.size()); + + // Prompt the user to kill the broker when doing failover testing. + _failBeforeSend = waitForUserToPromptOnFailure(_failBeforeSend); + + // Get the test setup for the correlation id. + String correlationID = message.getJMSCorrelationID(); + PerCorrelationId perCorrelationId = perCorrelationIds.get(correlationID); + + // If necessary, wait until the max pending message size comes within its limit. + if (_maxPendingSize > 0) + { + synchronized (_sendPauseMonitor) + { + // Used to keep track of the number of times that send has to wait. + int numWaits = 0; + + // The maximum number of waits before the test gives up and fails. This has been chosen to correspond with + // the test timeout. + int waitLimit = (int) (TIMEOUT_DEFAULT / 10000); + + while (true) + { + // Get the size estimate of sent but not yet received messages. + int unreceived = _unreceived.get(); + int unreceivedSize = + (unreceived * ((_messageSize == 0) ? 1 : _messageSize)) + / (_isPubSub ? getConsumersPerDestination() : 1); + + // log.debug("unreceived = " + unreceived); + // log.debug("unreceivedSize = " + unreceivedSize); + // log.debug("_maxPendingSize = " + _maxPendingSize); + + if (unreceivedSize > _maxPendingSize) + { + // log.debug("unreceived size estimate over limit = " + unreceivedSize); + + // Fail the test if the send has had to wait more than the maximum allowed number of times. + if (numWaits > waitLimit) + { + String errorMessage = + "Send has had to wait for the unreceivedSize (" + unreceivedSize + + ") to come below the maxPendingSize (" + _maxPendingSize + ") more that " + waitLimit + + " times."; + log.warn(errorMessage); + throw new RuntimeException(errorMessage); + } + + // Wait on the send pause barrier for the limit to be re-established. + try + { + long start = System.nanoTime(); + // _sendPauseMonitor.wait(10000); + _sendPauseMonitor.offer(new Object(), 10000, TimeUnit.MILLISECONDS); + long end = System.nanoTime(); + + // Count the wait only if it was for > 99% of the requested wait time. + if (((float) (end - start) / (float) (10000 * 1000000L)) > 0.99) + { + numWaits++; + } + } + catch (InterruptedException e) + { + // Restore the interrupted status + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + } + else + { + break; + } + } + } + } + + // Send the message either to its round robin destination, or its default destination. + // int num = numSent.incrementAndGet(); + // message.setIntProperty("MSG_NUM", num); + setTimestamp(message); + + if (destination == null) + { + _producer.send(message); + } + else + { + _producer.send(destination, message); + } + + // Increase the unreceived size, this may actually happen after the message is received. + // The unreceived size is incremented by the number of consumers that will get a copy of the message, + // in pub/sub mode. + if (_maxPendingSize > 0) + { + int newUnreceivedCount = _unreceived.addAndGet(_isPubSub ? getConsumersPerDestination() : 1); + // log.debug("newUnreceivedCount = " + newUnreceivedCount); + } + + // Apply message rate throttling if a rate limit has been set up. + if (_rateLimiter != null) + { + _rateLimiter.throttle(); + } + + // Call commit every time the commit batch size is reached. + boolean committed = false; + + // Commit on every transaction batch size boundary. Here i + 1 is the count of actual messages sent. + if (((i + 1) % _txBatchSize) == 0) + { + // log.debug("Trying commit on producer session."); + committed = commitTx(_producerSession); + } + + return committed; + } + finally + { + NDC.clear(); + } + } + + /** + * If the specified fail flag is set, this method waits for the user to cause a failure and then indicate to the + * test that the failure has occurred, before the method returns. + * + * @param failFlag The fail flag to test. + * + * @return The new value for the fail flag. If the {@link #_failOnce} flag is set, then each fail flag is only + * used once, then reset. + */ + private boolean waitForUserToPromptOnFailure(boolean failFlag) + { + if (failFlag) + { + if (_failOnce) + { + failFlag = false; + } + + // log.debug("Failing Before Send"); + waitForUser(KILL_BROKER_PROMPT); + } + + return failFlag; + } + + /** + * Implements a single iteration of the ping loop. This sends the number of pings specified by the transaction batch + * size property, and waits for replies to all of them. Any errors cause the publish flag to be cleared, which will + * terminate the pinger. + */ + public void pingLoop() + { + try + { + // Generate a sample message and time stamp it. + Message msg = getTestMessage(_replyDestination, _messageSize, _persistent); + // setTimestamp(msg); + + // Send the message and wait for a reply. + pingAndWaitForReply(msg, TX_BATCH_SIZE_DEFAULT, TIMEOUT_DEFAULT, null); + } + catch (JMSException e) + { + _publish = false; + // log.debug("There was a JMSException: " + e.getMessage(), e); + } + catch (InterruptedException e) + { + _publish = false; + // log.debug("There was an interruption: " + e.getMessage(), e); + } + } + + /** + * Sets a chained message listener. The message listener on this pinger, chains all its messages to the one set + * here. + * + * @param messageListener The chained message listener. + */ + public void setChainedMessageListener(ChainedMessageListener messageListener) + { + _chainedMessageListener = messageListener; + } + + /** Removes any chained message listeners from this pinger. */ + public void removeChainedMessageListener() + { + _chainedMessageListener = null; + } + + /** + * Generates a test message of the specified size, with the specified reply-to destination and persistence flag. + * + * @param replyQueue The reply-to destination for the message. + * @param messageSize The desired size of the message in bytes. + * @param persistent true if the message should use persistent delivery, false otherwise. + * + * @return A freshly generated test message. + * + * @throws javax.jms.JMSException All underlying JMSException are allowed to fall through. + */ + public Message getTestMessage(Destination replyQueue, int messageSize, boolean persistent) throws JMSException + { + // return TestMessageFactory.newObjectMessage(_producerSession, replyQueue, messageSize, persistent); + return TestUtils.createTestMessageOfSize(_producerSession, messageSize); + } + + /** + * Sets the current time in nanoseconds as the timestamp on the message. + * + * @param msg The message to timestamp. + * + * @throws JMSException Any JMSExceptions are allowed to fall through. + */ + protected void setTimestamp(Message msg) throws JMSException + { + /*if (((AMQSession)_producerSession).isStrictAMQP()) + { + ((AMQMessage)msg).setTimestampProperty(new AMQShortString(MESSAGE_TIMESTAMP_PROPNAME), System.nanoTime()); + } + else + {*/ + msg.setLongProperty(MESSAGE_TIMESTAMP_PROPNAME, System.nanoTime()); + // } + } + + /** + * Extracts the nanosecond timestamp from a message. + * + * @param msg The message to extract the time stamp from. + * + * @return The timestamp in nanos. + * + * @throws JMSException Any JMSExceptions are allowed to fall through. + */ + protected long getTimestamp(Message msg) throws JMSException + { + /*if (((AMQSession)_producerSession).isStrictAMQP()) + { + Long value = ((AMQMessage)msg).getTimestampProperty(new AMQShortString(MESSAGE_TIMESTAMP_PROPNAME)); + + return (value == null) ? 0L : value; + } + else + {*/ + return msg.getLongProperty(PingPongProducer.MESSAGE_TIMESTAMP_PROPNAME); + // } + } + + /** + * Stops the ping loop by clearing the publish flag. The current loop will complete when it notices that this flag + * has been cleared. + */ + public void stop() + { + _publish = false; + } + + /** + * Starts the producer and consumer connections. + * + * @throws JMSException Any JMSExceptions are allowed to fall through. + */ + public void start() throws JMSException + { + // log.debug("public void start(): called"); + + _connection.start(); + // log.debug("Producer started."); + + for (int i = 0; i < _noOfConsumers; i++) + { + _consumerConnection[i].start(); + // log.debug("Consumer " + i + " started."); + } + } + + /** Implements a ping loop that repeatedly pings until the publish flag becomes false. */ + public void run() + { + // Keep running until the publish flag is cleared. + while (_publish) + { + pingLoop(); + } + } + + /** + * Callback method, implementing ExceptionListener. This should be registered to listen for exceptions on the + * connection, this clears the publish flag which in turn will halt the ping loop. + * + * @param e The exception that triggered this callback method. + */ + public void onException(JMSException e) + { + // log.debug("public void onException(JMSException e = " + e + "): called", e); + _publish = false; + } + + /** + * Gets a shutdown hook that will cleanly shut this down when it is running the ping loop. This can be registered + * with the runtime system as a shutdown hook. + * + * @return A shutdown hook for the ping loop. + */ + public Thread getShutdownHook() + { + return new Thread(new Runnable() + { + public void run() + { + stop(); + } + }); + } + + /** + * Closes all of the producer and consumer connections. + * + * @throws JMSException All JMSException are allowed to fall through. + */ + public void close() throws JMSException + { + // log.debug("public void close(): called"); + + try + { + if (_connection != null) + { + // log.debug("Before close producer connection."); + _connection.close(); + // log.debug("Closed producer connection."); + } + + for (int i = 0; i < _noOfConsumers; i++) + { + if (_consumerConnection[i] != null) + { + // log.debug("Before close consumer connection " + i + "."); + _consumerConnection[i].close(); + // log.debug("Closed consumer connection " + i + "."); + } + } + } + finally + { + _connection = null; + _producerSession = null; + _consumerSession = null; + _consumerConnection = null; + _producer = null; + _consumer = null; + _pingDestinations = null; + _replyDestination = null; + } + } + + /** + * Convenience method to commit the transaction on the specified controlSession. If the controlSession to commit on is not a + * transactional controlSession, this method does nothing (unless the failover after send flag is set). + * + *

If the {@link #_failAfterSend} flag is set, this will prompt the user to kill the broker before the commit is + * applied. This flag applies whether the pinger is transactional or not. + * + *

If the {@link #_failBeforeCommit} flag is set, this will prompt the user to kill the broker before the commit + * is applied. If the {@link #_failAfterCommit} flag is set, this will prompt the user to kill the broker after the + * commit is applied. These flags will only apply if using a transactional pinger. + * + * @param session The controlSession to commit + * + * @return true if the controlSession was committed, false if it was not. + * + * @throws javax.jms.JMSException If the commit fails and then the rollback fails. + * + * @todo Consider moving the fail after send logic into the send method. It is confusing to have it in this commit + * method, because commits only apply to transactional pingers, but fail after send applied to transactional and + * non-transactional alike. + */ + protected boolean commitTx(Session session) throws JMSException + { + // log.debug("protected void commitTx(Session session): called"); + + boolean committed = false; + + _failAfterSend = waitForUserToPromptOnFailure(_failAfterSend); + + if (session.getTransacted()) + { + // log.debug("Session is transacted."); + + try + { + _failBeforeCommit = waitForUserToPromptOnFailure(_failBeforeCommit); + + long start = System.nanoTime(); + session.commit(); + committed = true; + // log.debug("Time taken to commit :" + ((System.nanoTime() - start) / 1000000f) + " ms"); + + _failAfterCommit = waitForUserToPromptOnFailure(_failAfterCommit); + + // log.debug("Session Commited."); + } + catch (JMSException e) + { + // log.debug("JMSException on commit:" + e.getMessage(), e); + + try + { + session.rollback(); + // log.debug("Message rolled back."); + } + catch (JMSException jmse) + { + // log.debug("JMSE on rollback:" + jmse.getMessage(), jmse); + + // Both commit and rollback failed. Throw the rollback exception. + throw jmse; + } + } + } + + return committed; + } + + /** + * Outputs a prompt to the console and waits for the user to press return. + * + * @param prompt The prompt to display on the console. + */ + public void waitForUser(String prompt) + { + System.out.println(prompt); + + try + { + System.in.read(); + } + catch (IOException e) + { + // Ignored. + } + + System.out.println("Continuing."); + } + + /** + * Gets the number of consumers that are listening to each destination in the test. + * + * @return int The number of consumers subscribing to each topic. + */ + public int getConsumersPerDestination() + { + return _noOfConsumers; + } + + /** + * Calculates how many pings are expected to be received for the given number sent. + * + * @param numpings The number of pings that will be sent. + * + * @return The number that should be received, for the test to pass. + */ + public int getExpectedNumPings(int numpings) + { + // log.debug("public int getExpectedNumPings(int numpings = " + numpings + "): called"); + + // log.debug("Each ping will be received by " + (_isPubSub ? getConsumersPerDestination() : 1) + " consumers."); + + return numpings * (_isPubSub ? getConsumersPerDestination() : 1); + } + + /** + * Defines a chained message listener interface that can be attached to this pinger. Whenever this pinger's {@link + * PingPongProducer#onMessageWithConsumerNo} method is called, the chained listener set through the {@link + * PingPongProducer#setChainedMessageListener} method is passed the message, and the remaining expected count of + * messages with that correlation id. + * + *

Provided only one pinger is producing messages with that correlation id, the chained listener will always be + * given unique message counts. It will always be called while the producer waiting for all messages to arrive is + * still blocked. + */ + public static interface ChainedMessageListener + { + /** + * Notifies interested listeners about message arrival and important test stats, the number of messages + * remaining in the test, and the messages send timestamp. + * + * @param message The newly arrived message. + * @param remainingCount The number of messages left to complete the test. + * @param latency The nanosecond latency of the message. + * + * @throws JMSException Any JMS exceptions is allowed to fall through. + */ + public void onMessage(Message message, int remainingCount, long latency) throws JMSException; + } + + /** + * Holds information on each correlation id. The countdown latch, the current timeout timer... More stuff to be + * added to this: read/write lock to make onMessage more concurrent as described in class header comment. + */ + protected static class PerCorrelationId + { + /** Holds a countdown on number of expected messages. */ + CountDownLatch trafficLight; + + /** Holds the last timestamp that the timeout was reset to. */ + Long timeOutStart; + } +} diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongTestPerf.java b/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongTestPerf.java index 2610b32220..009254c612 100644 --- a/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongTestPerf.java +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongTestPerf.java @@ -1,251 +1,251 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.requestreply; - -import junit.framework.Assert; -import junit.framework.Test; -import junit.framework.TestSuite; - -import org.apache.log4j.Logger; - -import org.apache.qpid.junit.extensions.AsymptoticTestCase; -import org.apache.qpid.junit.extensions.util.ParsedProperties; -import org.apache.qpid.junit.extensions.util.TestContextProperties; - -import javax.jms.*; - -/** - * PingPongTestPerf is a full round trip ping test, that has been written with the intention of being scaled up to run - * many times simultaneously to simluate many clients/producer/connections. A full round trip ping sends a message from - * a producer to a conumer, then the consumer replies to the message on a temporary queue. - * - *

A single run of the test using the default JUnit test runner will result in the sending and timing of the number - * of pings specified by the test size and time how long it takes for all of these to complete. This test may be scaled - * up using a suitable JUnit test runner. See {@link org.apache.qpid.junit.extensions.TKTestRunner} for more - * information on how to do this. - * - *

The setup/teardown cycle establishes a connection to a broker and sets up a queue to send ping messages to and a - * temporary queue for replies. This setup is only established once for all the test repeats, but each test threads - * gets its own connection/producer/consumer, this is only re-established if the connection is lost. - * - *

The test cycle is: Connects to a queue, creates a temporary queue, creates messages containing a property that - * is the name of the temporary queue, fires off many messages on the original queue and waits for them all to come - * back on the temporary queue. - * - *

Configurable test properties: message size, transacted or not, persistent or not. Broker connection details. - * - *

- *
CRC Card
Responsibilities Collaborations - *
- */ -public class PingPongTestPerf extends AsymptoticTestCase -{ - private static Logger _logger = Logger.getLogger(PingPongTestPerf.class); - - /** Thread local to hold the per-thread test setup fields. */ - ThreadLocal threadSetup = new ThreadLocal(); - - // Set up a property reader to extract the test parameters from. Once ContextualProperties is available in - // the project dependencies, use it to get property overrides for configurable tests and to notify the test runner - // of the test parameters to log with the results. It also providers some basic type parsing convenience methods. - // private Properties testParameters = System.getProperties(); - private ParsedProperties testParameters = - TestContextProperties.getInstance(PingPongProducer.defaults /*System.getProperties()*/); - - public PingPongTestPerf(String name) - { - super(name); - - _logger.debug(testParameters); - - // Sets up the test parameters with defaults. - /*testParameters.setPropertyIfNull(PingPongProducer.TX_BATCH_SIZE_PROPNAME, - Integer.toString(PingPongProducer.TX_BATCH_SIZE_DEFAULT)); - testParameters.setPropertyIfNull(PingPongProducer.MESSAGE_SIZE_PROPNAME, - Integer.toString(PingPongProducer.MESSAGE_SIZE_DEAFULT)); - testParameters.setPropertyIfNull(PingPongProducer.PING_QUEUE_NAME_PROPNAME, - PingPongProducer.PING_QUEUE_NAME_DEFAULT); - testParameters.setPropertyIfNull(PingPongProducer.PERSISTENT_MODE_PROPNAME, - Boolean.toString(PingPongProducer.PERSISTENT_MODE_DEFAULT)); - testParameters.setPropertyIfNull(PingPongProducer.TRANSACTED_PROPNAME, - Boolean.toString(PingPongProducer.TRANSACTED_DEFAULT)); - testParameters.setPropertyIfNull(PingPongProducer.BROKER_PROPNAME, PingPongProducer.BROKER_DEFAULT); - testParameters.setPropertyIfNull(PingPongProducer.USERNAME_PROPNAME, PingPongProducer.USERNAME_DEFAULT); - testParameters.setPropertyIfNull(PingPongProducer.PASSWORD_PROPNAME, PingPongProducer.PASSWORD_DEFAULT); - testParameters.setPropertyIfNull(PingPongProducer.VIRTUAL_HOST_PROPNAME, PingPongProducer.VIRTUAL_HOST_DEFAULT); - testParameters.setPropertyIfNull(PingPongProducer.VERBOSE_PROPNAME, - Boolean.toString(PingPongProducer.VERBOSE_DEFAULT)); - testParameters.setPropertyIfNull(PingPongProducer.RATE_PROPNAME, Integer.toString(PingPongProducer.RATE_DEFAULT)); - testParameters.setPropertyIfNull(PingPongProducer.PUBSUB_PROPNAME, - Boolean.toString(PingPongProducer.PUBSUB_DEFAULT)); - testParameters.setPropertyIfNull(PingPongProducer.TX_BATCH_SIZE_PROPNAME, - Integer.toString(PingPongProducer.TX_BATCH_SIZE_DEFAULT)); - testParameters.setPropertyIfNull(PingPongProducer.TIMEOUT_PROPNAME, Long.toString(PingPongProducer.TIMEOUT_DEFAULT)); - testParameters.setPropertyIfNull(PingPongProducer.DESTINATION_COUNT_PROPNAME, - Integer.toString(PingPongProducer.DESTINATION_COUNT_DEFAULT)); - testParameters.setPropertyIfNull(PingPongProducer.FAIL_AFTER_COMMIT_PROPNAME, - PingPongProducer.FAIL_AFTER_COMMIT_DEFAULT); - testParameters.setPropertyIfNull(PingPongProducer.FAIL_BEFORE_COMMIT_PROPNAME, - PingPongProducer.FAIL_BEFORE_COMMIT_DEFAULT); - testParameters.setPropertyIfNull(PingPongProducer.FAIL_AFTER_SEND_PROPNAME, - PingPongProducer.FAIL_AFTER_SEND_DEFAULT); - testParameters.setPropertyIfNull(PingPongProducer.FAIL_BEFORE_SEND_PROPNAME, - PingPongProducer.FAIL_BEFORE_SEND_DEFAULT); - testParameters.setPropertyIfNull(PingPongProducer.FAIL_ONCE_PROPNAME, PingPongProducer.FAIL_ONCE_DEFAULT); - testParameters.setPropertyIfNull(PingPongProducer.UNIQUE_DESTS_PROPNAME, - Boolean.toString(PingPongProducer.UNIQUE_DESTS_DEFAULT)); - testParameters.setPropertyIfNull(PingPongProducer.ACK_MODE_PROPNAME, - Integer.toString(PingPongProducer.ACK_MODE_DEFAULT)); - testParameters.setPropertyIfNull(PingPongProducer.PAUSE_AFTER_BATCH_PROPNAME, - PingPongProducer.PAUSE_AFTER_BATCH_DEFAULT);*/ - } - - /** - * Compile all the tests into a test suite. - */ - public static Test suite() - { - // Build a new test suite - TestSuite suite = new TestSuite("Ping-Pong Performance Tests"); - - // Run performance tests in read committed mode. - suite.addTest(new PingPongTestPerf("testPingPongOk")); - - return suite; - } - - private static void setSystemPropertyIfNull(String propName, String propValue) - { - if (System.getProperty(propName) == null) - { - System.setProperty(propName, propValue); - } - } - - public void testPingPongOk(int numPings) throws Exception - { - // Get the per thread test setup to run the test through. - PerThreadSetup perThreadSetup = threadSetup.get(); - - // Generate a sample message. This message is already time stamped and has its reply-to destination set. - Message msg = - perThreadSetup._testPingProducer.getTestMessage(perThreadSetup._testPingProducer.getReplyDestinations().get(0), - testParameters.getPropertyAsInteger(PingPongProducer.MESSAGE_SIZE_PROPNAME), - testParameters.getPropertyAsBoolean(PingPongProducer.PERSISTENT_MODE_PROPNAME)); - - // Send the message and wait for a reply. - int numReplies = - perThreadSetup._testPingProducer.pingAndWaitForReply(msg, numPings, PingPongProducer.TIMEOUT_DEFAULT, null); - - // Fail the test if the timeout was exceeded. - if (numReplies != numPings) - { - Assert.fail("The ping timed out, got " + numReplies + " out of " + numPings); - } - } - - /** - * Performs test fixture creation on a per thread basis. This will only be called once for each test thread. - */ - public void threadSetUp() - { - try - { - PerThreadSetup perThreadSetup = new PerThreadSetup(); - - // Extract the test set up paramaeters. - String brokerDetails = testParameters.getProperty(PingPongProducer.BROKER_PROPNAME); - String username = testParameters.getProperty(PingPongProducer.USERNAME_PROPNAME); - String password = testParameters.getProperty(PingPongProducer.PASSWORD_PROPNAME); - String virtualPath = testParameters.getProperty(PingPongProducer.VIRTUAL_HOST_PROPNAME); - String destinationName = testParameters.getProperty(PingPongProducer.PING_QUEUE_NAME_PROPNAME); - boolean persistent = testParameters.getPropertyAsBoolean(PingPongProducer.PERSISTENT_MODE_PROPNAME); - boolean transacted = testParameters.getPropertyAsBoolean(PingPongProducer.TRANSACTED_PROPNAME); - String selector = testParameters.getProperty(PingPongProducer.SELECTOR_PROPNAME); - boolean verbose = testParameters.getPropertyAsBoolean(PingPongProducer.VERBOSE_PROPNAME); - boolean pubsub = testParameters.getPropertyAsBoolean(PingPongProducer.PUBSUB_PROPNAME); - - synchronized (this) - { - // Establish a bounce back client on the ping queue to bounce back the pings. - perThreadSetup._testPingBouncer = - new PingPongBouncer(brokerDetails, username, password, virtualPath, destinationName, persistent, - transacted, selector, verbose, pubsub); - - // Start the connections for client and producer running. - perThreadSetup._testPingBouncer.getConnection().start(); - - // Establish a ping-pong client on the ping queue to send the pings and receive replies with. - perThreadSetup._testPingProducer = new PingPongProducer(testParameters); - perThreadSetup._testPingProducer.establishConnection(true, true); - perThreadSetup._testPingProducer.start(); - } - - // Attach the per-thread set to the thread. - threadSetup.set(perThreadSetup); - } - catch (Exception e) - { - _logger.warn("There was an exception during per thread setup.", e); - } - } - - /** - * Performs test fixture clean - */ - public void threadTearDown() - { - _logger.debug("public void threadTearDown(): called"); - - try - { - // Get the per thread test fixture. - PerThreadSetup perThreadSetup = threadSetup.get(); - - // Close the pingers so that it cleans up its connection cleanly. - synchronized (this) - { - perThreadSetup._testPingProducer.close(); - // perThreadSetup._testPingBouncer.close(); - } - - // Ensure the per thread fixture is reclaimed. - threadSetup.remove(); - } - catch (JMSException e) - { - _logger.warn("There was an exception during per thread tear down."); - } - } - - protected static class PerThreadSetup - { - /** - * Holds the test ping-pong producer. - */ - private PingPongProducer _testPingProducer; - - /** - * Holds the test ping client. - */ - private PingPongBouncer _testPingBouncer; - } -} +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.requestreply; + +import junit.framework.Assert; +import junit.framework.Test; +import junit.framework.TestSuite; + +import org.apache.log4j.Logger; + +import org.apache.qpid.junit.extensions.AsymptoticTestCase; +import org.apache.qpid.junit.extensions.util.ParsedProperties; +import org.apache.qpid.junit.extensions.util.TestContextProperties; + +import javax.jms.*; + +/** + * PingPongTestPerf is a full round trip ping test, that has been written with the intention of being scaled up to run + * many times simultaneously to simluate many clients/producer/connections. A full round trip ping sends a message from + * a producer to a conumer, then the consumer replies to the message on a temporary queue. + * + *

A single run of the test using the default JUnit test runner will result in the sending and timing of the number + * of pings specified by the test size and time how long it takes for all of these to complete. This test may be scaled + * up using a suitable JUnit test runner. See {@link org.apache.qpid.junit.extensions.TKTestRunner} for more + * information on how to do this. + * + *

The setup/teardown cycle establishes a connection to a broker and sets up a queue to send ping messages to and a + * temporary queue for replies. This setup is only established once for all the test repeats, but each test threads + * gets its own connection/producer/consumer, this is only re-established if the connection is lost. + * + *

The test cycle is: Connects to a queue, creates a temporary queue, creates messages containing a property that + * is the name of the temporary queue, fires off many messages on the original queue and waits for them all to come + * back on the temporary queue. + * + *

Configurable test properties: message size, transacted or not, persistent or not. Broker connection details. + * + *

+ *
CRC Card
Responsibilities Collaborations + *
+ */ +public class PingPongTestPerf extends AsymptoticTestCase +{ + private static Logger _logger = Logger.getLogger(PingPongTestPerf.class); + + /** Thread local to hold the per-thread test setup fields. */ + ThreadLocal threadSetup = new ThreadLocal(); + + // Set up a property reader to extract the test parameters from. Once ContextualProperties is available in + // the project dependencies, use it to get property overrides for configurable tests and to notify the test runner + // of the test parameters to log with the results. It also providers some basic type parsing convenience methods. + // private Properties testParameters = System.getProperties(); + private ParsedProperties testParameters = + TestContextProperties.getInstance(PingPongProducer.defaults /*System.getProperties()*/); + + public PingPongTestPerf(String name) + { + super(name); + + _logger.debug(testParameters); + + // Sets up the test parameters with defaults. + /*testParameters.setPropertyIfNull(PingPongProducer.TX_BATCH_SIZE_PROPNAME, + Integer.toString(PingPongProducer.TX_BATCH_SIZE_DEFAULT)); + testParameters.setPropertyIfNull(PingPongProducer.MESSAGE_SIZE_PROPNAME, + Integer.toString(PingPongProducer.MESSAGE_SIZE_DEAFULT)); + testParameters.setPropertyIfNull(PingPongProducer.PING_QUEUE_NAME_PROPNAME, + PingPongProducer.PING_QUEUE_NAME_DEFAULT); + testParameters.setPropertyIfNull(PingPongProducer.PERSISTENT_MODE_PROPNAME, + Boolean.toString(PingPongProducer.PERSISTENT_MODE_DEFAULT)); + testParameters.setPropertyIfNull(PingPongProducer.TRANSACTED_PROPNAME, + Boolean.toString(PingPongProducer.TRANSACTED_DEFAULT)); + testParameters.setPropertyIfNull(PingPongProducer.BROKER_PROPNAME, PingPongProducer.BROKER_DEFAULT); + testParameters.setPropertyIfNull(PingPongProducer.USERNAME_PROPNAME, PingPongProducer.USERNAME_DEFAULT); + testParameters.setPropertyIfNull(PingPongProducer.PASSWORD_PROPNAME, PingPongProducer.PASSWORD_DEFAULT); + testParameters.setPropertyIfNull(PingPongProducer.VIRTUAL_HOST_PROPNAME, PingPongProducer.VIRTUAL_HOST_DEFAULT); + testParameters.setPropertyIfNull(PingPongProducer.VERBOSE_PROPNAME, + Boolean.toString(PingPongProducer.VERBOSE_DEFAULT)); + testParameters.setPropertyIfNull(PingPongProducer.RATE_PROPNAME, Integer.toString(PingPongProducer.RATE_DEFAULT)); + testParameters.setPropertyIfNull(PingPongProducer.PUBSUB_PROPNAME, + Boolean.toString(PingPongProducer.PUBSUB_DEFAULT)); + testParameters.setPropertyIfNull(PingPongProducer.TX_BATCH_SIZE_PROPNAME, + Integer.toString(PingPongProducer.TX_BATCH_SIZE_DEFAULT)); + testParameters.setPropertyIfNull(PingPongProducer.TIMEOUT_PROPNAME, Long.toString(PingPongProducer.TIMEOUT_DEFAULT)); + testParameters.setPropertyIfNull(PingPongProducer.DESTINATION_COUNT_PROPNAME, + Integer.toString(PingPongProducer.DESTINATION_COUNT_DEFAULT)); + testParameters.setPropertyIfNull(PingPongProducer.FAIL_AFTER_COMMIT_PROPNAME, + PingPongProducer.FAIL_AFTER_COMMIT_DEFAULT); + testParameters.setPropertyIfNull(PingPongProducer.FAIL_BEFORE_COMMIT_PROPNAME, + PingPongProducer.FAIL_BEFORE_COMMIT_DEFAULT); + testParameters.setPropertyIfNull(PingPongProducer.FAIL_AFTER_SEND_PROPNAME, + PingPongProducer.FAIL_AFTER_SEND_DEFAULT); + testParameters.setPropertyIfNull(PingPongProducer.FAIL_BEFORE_SEND_PROPNAME, + PingPongProducer.FAIL_BEFORE_SEND_DEFAULT); + testParameters.setPropertyIfNull(PingPongProducer.FAIL_ONCE_PROPNAME, PingPongProducer.FAIL_ONCE_DEFAULT); + testParameters.setPropertyIfNull(PingPongProducer.UNIQUE_DESTS_PROPNAME, + Boolean.toString(PingPongProducer.UNIQUE_DESTS_DEFAULT)); + testParameters.setPropertyIfNull(PingPongProducer.ACK_MODE_PROPNAME, + Integer.toString(PingPongProducer.ACK_MODE_DEFAULT)); + testParameters.setPropertyIfNull(PingPongProducer.PAUSE_AFTER_BATCH_PROPNAME, + PingPongProducer.PAUSE_AFTER_BATCH_DEFAULT);*/ + } + + /** + * Compile all the tests into a test suite. + */ + public static Test suite() + { + // Build a new test suite + TestSuite suite = new TestSuite("Ping-Pong Performance Tests"); + + // Run performance tests in read committed mode. + suite.addTest(new PingPongTestPerf("testPingPongOk")); + + return suite; + } + + private static void setSystemPropertyIfNull(String propName, String propValue) + { + if (System.getProperty(propName) == null) + { + System.setProperty(propName, propValue); + } + } + + public void testPingPongOk(int numPings) throws Exception + { + // Get the per thread test setup to run the test through. + PerThreadSetup perThreadSetup = threadSetup.get(); + + // Generate a sample message. This message is already time stamped and has its reply-to destination set. + Message msg = + perThreadSetup._testPingProducer.getTestMessage(perThreadSetup._testPingProducer.getReplyDestinations().get(0), + testParameters.getPropertyAsInteger(PingPongProducer.MESSAGE_SIZE_PROPNAME), + testParameters.getPropertyAsBoolean(PingPongProducer.PERSISTENT_MODE_PROPNAME)); + + // Send the message and wait for a reply. + int numReplies = + perThreadSetup._testPingProducer.pingAndWaitForReply(msg, numPings, PingPongProducer.TIMEOUT_DEFAULT, null); + + // Fail the test if the timeout was exceeded. + if (numReplies != numPings) + { + Assert.fail("The ping timed out, got " + numReplies + " out of " + numPings); + } + } + + /** + * Performs test fixture creation on a per thread basis. This will only be called once for each test thread. + */ + public void threadSetUp() + { + try + { + PerThreadSetup perThreadSetup = new PerThreadSetup(); + + // Extract the test set up paramaeters. + String brokerDetails = testParameters.getProperty(PingPongProducer.BROKER_PROPNAME); + String username = testParameters.getProperty(PingPongProducer.USERNAME_PROPNAME); + String password = testParameters.getProperty(PingPongProducer.PASSWORD_PROPNAME); + String virtualPath = testParameters.getProperty(PingPongProducer.VIRTUAL_HOST_PROPNAME); + String destinationName = testParameters.getProperty(PingPongProducer.PING_QUEUE_NAME_PROPNAME); + boolean persistent = testParameters.getPropertyAsBoolean(PingPongProducer.PERSISTENT_MODE_PROPNAME); + boolean transacted = testParameters.getPropertyAsBoolean(PingPongProducer.TRANSACTED_PROPNAME); + String selector = testParameters.getProperty(PingPongProducer.SELECTOR_PROPNAME); + boolean verbose = testParameters.getPropertyAsBoolean(PingPongProducer.VERBOSE_PROPNAME); + boolean pubsub = testParameters.getPropertyAsBoolean(PingPongProducer.PUBSUB_PROPNAME); + + synchronized (this) + { + // Establish a bounce back client on the ping queue to bounce back the pings. + perThreadSetup._testPingBouncer = + new PingPongBouncer(brokerDetails, username, password, virtualPath, destinationName, persistent, + transacted, selector, verbose, pubsub); + + // Start the connections for client and producer running. + perThreadSetup._testPingBouncer.getConnection().start(); + + // Establish a ping-pong client on the ping queue to send the pings and receive replies with. + perThreadSetup._testPingProducer = new PingPongProducer(testParameters); + perThreadSetup._testPingProducer.establishConnection(true, true); + perThreadSetup._testPingProducer.start(); + } + + // Attach the per-thread set to the thread. + threadSetup.set(perThreadSetup); + } + catch (Exception e) + { + _logger.warn("There was an exception during per thread setup.", e); + } + } + + /** + * Performs test fixture clean + */ + public void threadTearDown() + { + _logger.debug("public void threadTearDown(): called"); + + try + { + // Get the per thread test fixture. + PerThreadSetup perThreadSetup = threadSetup.get(); + + // Close the pingers so that it cleans up its connection cleanly. + synchronized (this) + { + perThreadSetup._testPingProducer.close(); + // perThreadSetup._testPingBouncer.close(); + } + + // Ensure the per thread fixture is reclaimed. + threadSetup.remove(); + } + catch (JMSException e) + { + _logger.warn("There was an exception during per thread tear down."); + } + } + + protected static class PerThreadSetup + { + /** + * Holds the test ping-pong producer. + */ + private PingPongProducer _testPingProducer; + + /** + * Holds the test ping client. + */ + private PingPongBouncer _testPingBouncer; + } +} diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/test/testcases/MessageThroughputPerf.java b/qpid/java/perftests/src/main/java/org/apache/qpid/test/testcases/MessageThroughputPerf.java index f699295b06..0fcb0a8538 100644 --- a/qpid/java/perftests/src/main/java/org/apache/qpid/test/testcases/MessageThroughputPerf.java +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/test/testcases/MessageThroughputPerf.java @@ -1,199 +1,199 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.test.testcases; - -import junit.framework.Test; -import junit.framework.TestSuite; - -import org.apache.log4j.Logger; -import org.apache.log4j.NDC; - -import org.apache.qpid.test.framework.Assertion; -import org.apache.qpid.test.framework.Circuit; -import org.apache.qpid.test.framework.FrameworkBaseCase; -import org.apache.qpid.test.framework.MessagingTestConfigProperties; -import org.apache.qpid.test.framework.sequencers.CircuitFactory; - -import org.apache.qpid.junit.extensions.TestThreadAware; -import org.apache.qpid.junit.extensions.TimingController; -import org.apache.qpid.junit.extensions.TimingControllerAware; -import org.apache.qpid.junit.extensions.util.ParsedProperties; -import org.apache.qpid.junit.extensions.util.TestContextProperties; - -import java.util.LinkedList; - -/** - * MessageThroughputPerf runs a test over a {@link Circuit} controlled by the test parameters. It logs timings of - * the time required to receive samples consisting of batches of messages. - * - *

- *
CRC Card
Responsibilities Collaborations - *
Measure message throughput accross a test circuit. {@link Circuit} - *
- * - * @todo Check that all of the messages were sent. Check that the receiving end got the same number of messages as - * the publishing end. - * - * @todo Set this up to run with zero sized tests. Size zero means send forever. Continuous sending to be interrupted - * by completion of the test duration, or shutdown hook when the user presses Ctrl-C. - */ -public class MessageThroughputPerf extends FrameworkBaseCase implements TimingControllerAware, TestThreadAware -{ - /** Used for debugging. */ - private static final Logger log = Logger.getLogger(MessageThroughputPerf.class); - - /** Holds the timing controller, used to log test timings from self-timed tests. */ - private TimingController timingController; - - /** Thread local to hold the per-thread test setup fields. */ - ThreadLocal threadSetup = new ThreadLocal(); - - /** - * Creates a new test case with the specified name. - * - * @param name The test case name. - */ - public MessageThroughputPerf(String name) - { - super(name); - } - - /** - * Performs the a basic P2P test case. - * - * @param numMessages The number of messages to send in the test. - */ - public void testThroughput(int numMessages) - { - log.debug("public void testThroughput(): called"); - - PerThreadSetup setup = threadSetup.get(); - assertNoFailures(setup.testCircuit.test(numMessages, new LinkedList())); - } - - /** - * Should provide a translation from the junit method name of a test to its test case name as known to the test - * clients that will run the test. The purpose of this is to convert the JUnit method name into the correct test - * case name to place into the test invite. For example the method "testP2P" might map onto the interop test case - * name "TC2_BasicP2P". - * - * @param methodName The name of the JUnit test method. - * - * @return The name of the corresponding interop test case. - */ - public String getTestCaseNameForTestMethod(String methodName) - { - log.debug("public String getTestCaseNameForTestMethod(String methodName = " + methodName + "): called"); - - return "DEFAULT_CIRCUIT_TEST"; - } - - /** - * Used by test runners that can supply a {@link org.apache.qpid.junit.extensions.TimingController} to set the - * controller on an aware test. - * - * @param controller The timing controller. - */ - public void setTimingController(TimingController controller) - { - timingController = controller; - } - - /** - * Overrides the parent setUp method so that the in-vm broker creation is not done on a per test basis. - * - * @throws Exception Any exceptions allowed to fall through and fail the test. - */ - protected void setUp() throws Exception - { - NDC.push(getName()); - - testProps = TestContextProperties.getInstance(MessagingTestConfigProperties.defaults); - } - - /** - * Overrides the parent setUp method so that the in-vm broker clean-up is not done on a per test basis. - */ - protected void tearDown() - { - NDC.pop(); - } - - /** - * Performs test fixture creation on a per thread basis. This will only be called once for each test thread. - */ - public void threadSetUp() - { - // Run the test setup tasks. This may create an in-vm broker, if a decorator has injected a task for this. - taskHandler.runSetupTasks(); - - // Get the test parameters, any overrides on the command line will have been applied. - ParsedProperties testProps = TestContextProperties.getInstance(MessagingTestConfigProperties.defaults); - - // Customize the test parameters. - testProps.setProperty("TEST_NAME", "DEFAULT_CIRCUIT_TEST"); - testProps.setProperty(MessagingTestConfigProperties.SEND_DESTINATION_NAME_ROOT_PROPNAME, "testqueue"); - - // Get the test circuit factory to create test circuits and run the standard test procedure through. - CircuitFactory circuitFactory = getCircuitFactory(); - - // Create the test circuit. This projects the circuit onto the available test nodes and connects it up. - Circuit testCircuit = circuitFactory.createCircuit(testProps); - - // Store the test configuration for the thread. - PerThreadSetup setup = new PerThreadSetup(); - setup.testCircuit = testCircuit; - threadSetup.set(setup); - } - - /** - * Called when a test thread is destroyed. - */ - public void threadTearDown() - { - // Run the test teardown tasks. This may destroy the in-vm broker, if a decorator has injected a task for this. - taskHandler.runSetupTasks(); - } - - /** - * Holds the per-thread test configurations. - */ - protected static class PerThreadSetup - { - /** Holds the test circuit to run tests on. */ - Circuit testCircuit; - } - - /** - * Compiles all the tests in this class into a suite. - * - * @return The test suite. - */ - public static Test suite() - { - // Build a new test suite - TestSuite suite = new TestSuite("Qpid Throughput Performance Tests"); - - suite.addTest(new MessageThroughputPerf("testThroughput")); - - return suite; - } -} +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.test.testcases; + +import junit.framework.Test; +import junit.framework.TestSuite; + +import org.apache.log4j.Logger; +import org.apache.log4j.NDC; + +import org.apache.qpid.test.framework.Assertion; +import org.apache.qpid.test.framework.Circuit; +import org.apache.qpid.test.framework.FrameworkBaseCase; +import org.apache.qpid.test.framework.MessagingTestConfigProperties; +import org.apache.qpid.test.framework.sequencers.CircuitFactory; + +import org.apache.qpid.junit.extensions.TestThreadAware; +import org.apache.qpid.junit.extensions.TimingController; +import org.apache.qpid.junit.extensions.TimingControllerAware; +import org.apache.qpid.junit.extensions.util.ParsedProperties; +import org.apache.qpid.junit.extensions.util.TestContextProperties; + +import java.util.LinkedList; + +/** + * MessageThroughputPerf runs a test over a {@link Circuit} controlled by the test parameters. It logs timings of + * the time required to receive samples consisting of batches of messages. + * + *

+ *
CRC Card
Responsibilities Collaborations + *
Measure message throughput accross a test circuit. {@link Circuit} + *
+ * + * @todo Check that all of the messages were sent. Check that the receiving end got the same number of messages as + * the publishing end. + * + * @todo Set this up to run with zero sized tests. Size zero means send forever. Continuous sending to be interrupted + * by completion of the test duration, or shutdown hook when the user presses Ctrl-C. + */ +public class MessageThroughputPerf extends FrameworkBaseCase implements TimingControllerAware, TestThreadAware +{ + /** Used for debugging. */ + private static final Logger log = Logger.getLogger(MessageThroughputPerf.class); + + /** Holds the timing controller, used to log test timings from self-timed tests. */ + private TimingController timingController; + + /** Thread local to hold the per-thread test setup fields. */ + ThreadLocal threadSetup = new ThreadLocal(); + + /** + * Creates a new test case with the specified name. + * + * @param name The test case name. + */ + public MessageThroughputPerf(String name) + { + super(name); + } + + /** + * Performs the a basic P2P test case. + * + * @param numMessages The number of messages to send in the test. + */ + public void testThroughput(int numMessages) + { + log.debug("public void testThroughput(): called"); + + PerThreadSetup setup = threadSetup.get(); + assertNoFailures(setup.testCircuit.test(numMessages, new LinkedList())); + } + + /** + * Should provide a translation from the junit method name of a test to its test case name as known to the test + * clients that will run the test. The purpose of this is to convert the JUnit method name into the correct test + * case name to place into the test invite. For example the method "testP2P" might map onto the interop test case + * name "TC2_BasicP2P". + * + * @param methodName The name of the JUnit test method. + * + * @return The name of the corresponding interop test case. + */ + public String getTestCaseNameForTestMethod(String methodName) + { + log.debug("public String getTestCaseNameForTestMethod(String methodName = " + methodName + "): called"); + + return "DEFAULT_CIRCUIT_TEST"; + } + + /** + * Used by test runners that can supply a {@link org.apache.qpid.junit.extensions.TimingController} to set the + * controller on an aware test. + * + * @param controller The timing controller. + */ + public void setTimingController(TimingController controller) + { + timingController = controller; + } + + /** + * Overrides the parent setUp method so that the in-vm broker creation is not done on a per test basis. + * + * @throws Exception Any exceptions allowed to fall through and fail the test. + */ + protected void setUp() throws Exception + { + NDC.push(getName()); + + testProps = TestContextProperties.getInstance(MessagingTestConfigProperties.defaults); + } + + /** + * Overrides the parent setUp method so that the in-vm broker clean-up is not done on a per test basis. + */ + protected void tearDown() + { + NDC.pop(); + } + + /** + * Performs test fixture creation on a per thread basis. This will only be called once for each test thread. + */ + public void threadSetUp() + { + // Run the test setup tasks. This may create an in-vm broker, if a decorator has injected a task for this. + taskHandler.runSetupTasks(); + + // Get the test parameters, any overrides on the command line will have been applied. + ParsedProperties testProps = TestContextProperties.getInstance(MessagingTestConfigProperties.defaults); + + // Customize the test parameters. + testProps.setProperty("TEST_NAME", "DEFAULT_CIRCUIT_TEST"); + testProps.setProperty(MessagingTestConfigProperties.SEND_DESTINATION_NAME_ROOT_PROPNAME, "testqueue"); + + // Get the test circuit factory to create test circuits and run the standard test procedure through. + CircuitFactory circuitFactory = getCircuitFactory(); + + // Create the test circuit. This projects the circuit onto the available test nodes and connects it up. + Circuit testCircuit = circuitFactory.createCircuit(testProps); + + // Store the test configuration for the thread. + PerThreadSetup setup = new PerThreadSetup(); + setup.testCircuit = testCircuit; + threadSetup.set(setup); + } + + /** + * Called when a test thread is destroyed. + */ + public void threadTearDown() + { + // Run the test teardown tasks. This may destroy the in-vm broker, if a decorator has injected a task for this. + taskHandler.runSetupTasks(); + } + + /** + * Holds the per-thread test configurations. + */ + protected static class PerThreadSetup + { + /** Holds the test circuit to run tests on. */ + Circuit testCircuit; + } + + /** + * Compiles all the tests in this class into a suite. + * + * @return The test suite. + */ + public static Test suite() + { + // Build a new test suite + TestSuite suite = new TestSuite("Qpid Throughput Performance Tests"); + + suite.addTest(new MessageThroughputPerf("testThroughput")); + + return suite; + } +} -- cgit v1.2.1