summaryrefslogtreecommitdiff
path: root/qpid/java/perftests/src/main
diff options
context:
space:
mode:
authorRobert Gemmell <robbie@apache.org>2012-04-05 16:04:10 +0000
committerRobert Gemmell <robbie@apache.org>2012-04-05 16:04:10 +0000
commiteb58a828676bf1fad33c317f86a909f18cadacbc (patch)
treeaef0634df4867e761ea65792d5e19eacbc029f49 /qpid/java/perftests/src/main
parent347ee6bc0e71ca79d729ccf53c134fbe01289621 (diff)
downloadqpid-python-eb58a828676bf1fad33c317f86a909f18cadacbc.tar.gz
QPID-3936: initial checkin of new testing framework, initially to be used for performance testing but later to be expanded for use with other testing scenarios.
Applied patch from Philip Harvey <phil@philharveyonline.com>, Oleksandr Rudyy <orudyy@gmail.com>, Andrew MacBean <andymacbean@gmail.com>, Keith Wall <kwall@apache.org>, and myself. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1309918 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/perftests/src/main')
-rw-r--r--qpid/java/perftests/src/main/java/org/apache/qpid/disttest/AbstractRunner.java74
-rw-r--r--qpid/java/perftests/src/main/java/org/apache/qpid/disttest/ArgumentParser.java44
-rw-r--r--qpid/java/perftests/src/main/java/org/apache/qpid/disttest/ClientRunner.java94
-rw-r--r--qpid/java/perftests/src/main/java/org/apache/qpid/disttest/ControllerRunner.java179
-rw-r--r--qpid/java/perftests/src/main/java/org/apache/qpid/disttest/DistributedTestConstants.java33
-rw-r--r--qpid/java/perftests/src/main/java/org/apache/qpid/disttest/DistributedTestException.java66
-rw-r--r--qpid/java/perftests/src/main/java/org/apache/qpid/disttest/Visitor.java76
-rw-r--r--qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/Client.java204
-rw-r--r--qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/ClientCommandVisitor.java98
-rw-r--r--qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/ClientState.java25
-rw-r--r--qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/ConsumerParticipant.java235
-rw-r--r--qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/MessageProvider.java212
-rw-r--r--qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/Participant.java31
-rw-r--r--qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/ParticipantExecutor.java132
-rw-r--r--qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/ParticipantExecutorRegistry.java45
-rw-r--r--qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/ParticipantResultFactory.java94
-rw-r--r--qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/ProducerParticipant.java158
-rw-r--r--qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/property/GeneratedPropertySupport.java68
-rw-r--r--qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/property/GeneratedPropertyValue.java27
-rw-r--r--qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/property/ListPropertyValue.java122
-rw-r--r--qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/property/NumericGeneratedPropertySupport.java179
-rw-r--r--qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/property/PropertyValue.java27
-rw-r--r--qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/property/PropertyValueFactory.java46
-rw-r--r--qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/property/RandomPropertyValue.java47
-rw-r--r--qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/property/RangePropertyValue.java129
-rw-r--r--qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/property/SimplePropertyValue.java79
-rw-r--r--qpid/java/perftests/src/main/java/org/apache/qpid/disttest/controller/ClientRegistry.java52
-rw-r--r--qpid/java/perftests/src/main/java/org/apache/qpid/disttest/controller/CommandForClient.java46
-rw-r--r--qpid/java/perftests/src/main/java/org/apache/qpid/disttest/controller/CommandListener.java31
-rw-r--r--qpid/java/perftests/src/main/java/org/apache/qpid/disttest/controller/Controller.java227
-rw-r--r--qpid/java/perftests/src/main/java/org/apache/qpid/disttest/controller/ParticipatingClients.java94
-rw-r--r--qpid/java/perftests/src/main/java/org/apache/qpid/disttest/controller/ResultsForAllTests.java49
-rw-r--r--qpid/java/perftests/src/main/java/org/apache/qpid/disttest/controller/TestResult.java70
-rw-r--r--qpid/java/perftests/src/main/java/org/apache/qpid/disttest/controller/TestRunner.java307
-rw-r--r--qpid/java/perftests/src/main/java/org/apache/qpid/disttest/controller/TestRunnerFactory.java31
-rw-r--r--qpid/java/perftests/src/main/java/org/apache/qpid/disttest/controller/config/ClientConfig.java113
-rw-r--r--qpid/java/perftests/src/main/java/org/apache/qpid/disttest/controller/config/Config.java81
-rw-r--r--qpid/java/perftests/src/main/java/org/apache/qpid/disttest/controller/config/ConfigReader.java52
-rw-r--r--qpid/java/perftests/src/main/java/org/apache/qpid/disttest/controller/config/ConnectionConfig.java91
-rw-r--r--qpid/java/perftests/src/main/java/org/apache/qpid/disttest/controller/config/ConsumerConfig.java65
-rw-r--r--qpid/java/perftests/src/main/java/org/apache/qpid/disttest/controller/config/IterationValue.java86
-rw-r--r--qpid/java/perftests/src/main/java/org/apache/qpid/disttest/controller/config/MessageProviderConfig.java60
-rw-r--r--qpid/java/perftests/src/main/java/org/apache/qpid/disttest/controller/config/ParticipantConfig.java64
-rw-r--r--qpid/java/perftests/src/main/java/org/apache/qpid/disttest/controller/config/ProducerConfig.java90
-rw-r--r--qpid/java/perftests/src/main/java/org/apache/qpid/disttest/controller/config/QueueConfig.java69
-rw-r--r--qpid/java/perftests/src/main/java/org/apache/qpid/disttest/controller/config/SessionConfig.java114
-rw-r--r--qpid/java/perftests/src/main/java/org/apache/qpid/disttest/controller/config/TestConfig.java117
-rw-r--r--qpid/java/perftests/src/main/java/org/apache/qpid/disttest/controller/config/TestInstance.java102
-rw-r--r--qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/ClientJmsDelegate.java575
-rw-r--r--qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/ControllerJmsDelegate.java210
-rw-r--r--qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/JmsMessageAdaptor.java124
-rw-r--r--qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/QpidQueueCreator.java110
-rw-r--r--qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/QueueCreator.java31
-rw-r--r--qpid/java/perftests/src/main/java/org/apache/qpid/disttest/json/JsonHandler.java43
-rw-r--r--qpid/java/perftests/src/main/java/org/apache/qpid/disttest/json/PropertyValueAdapter.java147
-rw-r--r--qpid/java/perftests/src/main/java/org/apache/qpid/disttest/message/Command.java56
-rw-r--r--qpid/java/perftests/src/main/java/org/apache/qpid/disttest/message/CommandType.java39
-rw-r--r--qpid/java/perftests/src/main/java/org/apache/qpid/disttest/message/ConsumerParticipantResult.java118
-rw-r--r--qpid/java/perftests/src/main/java/org/apache/qpid/disttest/message/CreateConnectionCommand.java58
-rw-r--r--qpid/java/perftests/src/main/java/org/apache/qpid/disttest/message/CreateConsumerCommand.java108
-rw-r--r--qpid/java/perftests/src/main/java/org/apache/qpid/disttest/message/CreateMessageProviderCommand.java54
-rw-r--r--qpid/java/perftests/src/main/java/org/apache/qpid/disttest/message/CreateParticpantCommand.java96
-rw-r--r--qpid/java/perftests/src/main/java/org/apache/qpid/disttest/message/CreateProducerCommand.java106
-rw-r--r--qpid/java/perftests/src/main/java/org/apache/qpid/disttest/message/CreateResponderCommand.java28
-rw-r--r--qpid/java/perftests/src/main/java/org/apache/qpid/disttest/message/CreateSessionCommand.java62
-rw-r--r--qpid/java/perftests/src/main/java/org/apache/qpid/disttest/message/NoOpCommand.java30
-rw-r--r--qpid/java/perftests/src/main/java/org/apache/qpid/disttest/message/OutputAttribute.java35
-rw-r--r--qpid/java/perftests/src/main/java/org/apache/qpid/disttest/message/ParticipantAttribute.java67
-rw-r--r--qpid/java/perftests/src/main/java/org/apache/qpid/disttest/message/ParticipantAttributeExtractor.java89
-rw-r--r--qpid/java/perftests/src/main/java/org/apache/qpid/disttest/message/ParticipantResult.java231
-rw-r--r--qpid/java/perftests/src/main/java/org/apache/qpid/disttest/message/ProducerParticipantResult.java100
-rw-r--r--qpid/java/perftests/src/main/java/org/apache/qpid/disttest/message/RegisterClientCommand.java43
-rw-r--r--qpid/java/perftests/src/main/java/org/apache/qpid/disttest/message/Response.java80
-rw-r--r--qpid/java/perftests/src/main/java/org/apache/qpid/disttest/message/StartTestCommand.java29
-rw-r--r--qpid/java/perftests/src/main/java/org/apache/qpid/disttest/message/StopClientCommand.java28
-rw-r--r--qpid/java/perftests/src/main/java/org/apache/qpid/disttest/message/TearDownTestCommand.java29
-rw-r--r--qpid/java/perftests/src/main/java/org/apache/qpid/disttest/results/aggregation/AggregatedTestResult.java97
-rw-r--r--qpid/java/perftests/src/main/java/org/apache/qpid/disttest/results/aggregation/Aggregator.java49
-rw-r--r--qpid/java/perftests/src/main/java/org/apache/qpid/disttest/results/aggregation/ITestResult.java36
-rw-r--r--qpid/java/perftests/src/main/java/org/apache/qpid/disttest/results/aggregation/ParticipantResultAggregator.java124
-rw-r--r--qpid/java/perftests/src/main/java/org/apache/qpid/disttest/results/aggregation/TestResultAggregator.java98
-rw-r--r--qpid/java/perftests/src/main/java/org/apache/qpid/disttest/results/formatting/CSVFormater.java89
-rw-r--r--qpid/java/perftests/src/main/java/org/apache/qpid/disttest/results/formatting/CSVOrderParticipantResultComparator.java55
-rw-r--r--qpid/java/perftests/src/main/resources/perftests.properties20
84 files changed, 7829 insertions, 0 deletions
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/AbstractRunner.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/AbstractRunner.java
new file mode 100644
index 0000000000..9e865010f8
--- /dev/null
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/AbstractRunner.java
@@ -0,0 +1,74 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.disttest;
+
+import java.io.FileInputStream;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import javax.naming.Context;
+import javax.naming.InitialContext;
+
+public class AbstractRunner
+{
+ public static final String JNDI_CONFIG_PROP = "jndi-config";
+ public static final String JNDI_CONFIG_DEFAULT = "perftests-jndi.properties";
+
+ private Map<String,String> _cliOptions = new HashMap<String, String>();
+ {
+ getCliOptions().put(JNDI_CONFIG_PROP, JNDI_CONFIG_DEFAULT);
+ }
+
+ protected Context getContext()
+ {
+ Context context = null;
+
+ try
+ {
+ final Properties properties = new Properties();
+ properties.load(new FileInputStream(getJndiConfig()));
+ context = new InitialContext(properties);
+ }
+ catch (Exception e)
+ {
+ throw new DistributedTestException("Exception while loading JNDI properties", e);
+ }
+
+ return context;
+ }
+
+ public void parseArgumentsIntoConfig(String[] args)
+ {
+ ArgumentParser argumentParser = new ArgumentParser();
+ argumentParser.parseArgumentsIntoConfig(getCliOptions(), args);
+ }
+
+ protected String getJndiConfig()
+ {
+ return getCliOptions().get(AbstractRunner.JNDI_CONFIG_PROP);
+ }
+
+ protected Map<String,String> getCliOptions()
+ {
+ return _cliOptions;
+ }
+} \ No newline at end of file
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/ArgumentParser.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/ArgumentParser.java
new file mode 100644
index 0000000000..8c1f8675e3
--- /dev/null
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/ArgumentParser.java
@@ -0,0 +1,44 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.disttest;
+
+import java.util.Map;
+
+public class ArgumentParser
+{
+ public void parseArgumentsIntoConfig(Map<String, String> initialValues, String[] args)
+ {
+ for(String arg: args)
+ {
+ String[] splitArg = arg.split("=");
+ if(splitArg.length != 2)
+ {
+ throw new IllegalArgumentException("arguments must have format <name>=<value>: " + arg);
+ }
+
+ if(initialValues.put(splitArg[0], splitArg[1]) == null)
+ {
+ throw new IllegalArgumentException("not a valid configuration property: " + arg);
+ }
+ }
+
+ }
+}
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/ClientRunner.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/ClientRunner.java
new file mode 100644
index 0000000000..9c2f4a3d95
--- /dev/null
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/ClientRunner.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.disttest;
+
+import javax.naming.Context;
+import javax.naming.NamingException;
+
+import org.apache.qpid.disttest.client.Client;
+import org.apache.qpid.disttest.jms.ClientJmsDelegate;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ClientRunner extends AbstractRunner
+{
+ private static final Logger LOGGER = LoggerFactory.getLogger(ClientRunner.class);
+
+ public static final String NUM_OF_CLIENTS_PROP = "number-of-clients";
+
+ public static final String NUM_OF_CLIENTS_DEFAULT = "1";
+
+ public ClientRunner()
+ {
+ getCliOptions().put(NUM_OF_CLIENTS_PROP, NUM_OF_CLIENTS_DEFAULT);
+ }
+
+ public static void main(String[] args)
+ {
+ ClientRunner runner = new ClientRunner();
+ runner.parseArgumentsIntoConfig(args);
+ runner.runClients();
+ }
+
+ public void setJndiPropertiesFileLocation(String jndiConfigFileLocation)
+ {
+ getCliOptions().put(JNDI_CONFIG_PROP, jndiConfigFileLocation);
+ }
+
+ /**
+ * Run the clients in the background
+ */
+ public void runClients()
+ {
+ int numClients = Integer.parseInt(getCliOptions().get(NUM_OF_CLIENTS_PROP));
+
+ Context context = getContext();
+
+ for(int i = 1; i <= numClients; i++)
+ {
+ createBackgroundClient(context);
+ }
+ }
+
+ private void createBackgroundClient(Context context)
+ {
+ try
+ {
+ final Client client = new Client(new ClientJmsDelegate(context));
+
+ final Thread clientThread = new Thread(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ LOGGER.info("Starting client " + client.getClientName());
+ client.start();
+ client.waitUntilStopped();
+ LOGGER.info("Stopped client " + client.getClientName());
+ }
+ });
+ clientThread.start();
+ }
+ catch (NamingException e)
+ {
+ throw new DistributedTestException("Exception while creating client instance", e);
+ }
+ }
+}
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/ControllerRunner.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/ControllerRunner.java
new file mode 100644
index 0000000000..07d78790f0
--- /dev/null
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/ControllerRunner.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.disttest;
+
+import java.io.FileNotFoundException;
+import java.io.FileWriter;
+import java.io.IOException;
+
+import javax.naming.Context;
+
+import org.apache.qpid.disttest.controller.Controller;
+import org.apache.qpid.disttest.controller.ResultsForAllTests;
+import org.apache.qpid.disttest.controller.config.Config;
+import org.apache.qpid.disttest.controller.config.ConfigReader;
+import org.apache.qpid.disttest.jms.ControllerJmsDelegate;
+import org.apache.qpid.disttest.results.aggregation.Aggregator;
+import org.apache.qpid.disttest.results.formatting.CSVFormater;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ControllerRunner extends AbstractRunner
+{
+ private static final Logger LOGGER = LoggerFactory.getLogger(ControllerRunner.class);
+
+ public static final String TEST_CONFIG_PROP = "test-config";
+ public static final String DISTRIBUTED_PROP = "distributed";
+ public static final String OUTPUT_FILE_PROP = "outputfile";
+
+ private static final String TEST_CONFIG_DEFAULT = "perftests-config.json";
+ private static final String DISTRIBUTED_DEFAULT = "false";
+ private static final String OUTPUT_FILE_DEFAULT = "output.csv";
+
+ private Controller _controller;
+
+ private final Aggregator _aggregator = new Aggregator();
+
+
+ public ControllerRunner()
+ {
+ getCliOptions().put(TEST_CONFIG_PROP, TEST_CONFIG_DEFAULT);
+ getCliOptions().put(DISTRIBUTED_PROP, DISTRIBUTED_DEFAULT);
+ getCliOptions().put(OUTPUT_FILE_PROP, OUTPUT_FILE_DEFAULT);
+ }
+
+ public static void main(String[] args) throws Exception
+ {
+ ControllerRunner runner = new ControllerRunner();
+ runner.parseArgumentsIntoConfig(args);
+ runner.runController();
+ }
+
+ public void runController() throws Exception
+ {
+ Context context = getContext();
+
+ ControllerJmsDelegate jmsDelegate = new ControllerJmsDelegate(context);
+
+ try
+ {
+ runTests(jmsDelegate);
+ }
+ finally
+ {
+ jmsDelegate.closeConnections();
+ }
+ }
+
+ private void runTests(ControllerJmsDelegate jmsDelegate)
+ {
+
+ _controller = new Controller(jmsDelegate,
+ DistributedTestConstants.REGISTRATION_TIMEOUT, DistributedTestConstants.COMMAND_RESPONSE_TIMEOUT);
+
+ Config testConfig = getTestConfig();
+ _controller.setConfig(testConfig);
+
+ if(!isDistributed())
+ {
+ //we must create the required test clients, running in single-jvm mode
+ int numClients = testConfig.getTotalNumberOfClients();
+ for (int i = 1; i <= numClients; i++)
+ {
+ ClientRunner clientRunner = new ClientRunner();
+ clientRunner.setJndiPropertiesFileLocation(getJndiConfig());
+ clientRunner.runClients();
+ }
+ }
+
+ ResultsForAllTests resultsForAllTests = null;
+ try
+ {
+ _controller.awaitClientRegistrations();
+
+ ResultsForAllTests rawResultsForAllTests = _controller.runAllTests();
+ resultsForAllTests = _aggregator.aggregateResults(rawResultsForAllTests);
+ }
+ finally
+ {
+ _controller.stopAllRegisteredClients();
+ }
+
+ final String outputFile = getOutputFile();
+ writeResultsToFile(resultsForAllTests, outputFile);
+ }
+
+ private void writeResultsToFile(ResultsForAllTests resultsForAllTests, String outputFile)
+ {
+ FileWriter writer = null;
+ try
+ {
+ final String outputCsv = new CSVFormater().format(resultsForAllTests);
+ writer = new FileWriter(outputFile);
+ writer.write(outputCsv);
+ LOGGER.info("Wrote " + resultsForAllTests.getTestResults().size() + " test result(s) to output file " + outputFile);
+ }
+ catch (IOException e)
+ {
+ throw new DistributedTestException("Unable to write output file " + outputFile, e);
+ }
+ finally
+ {
+ if (writer != null)
+ {
+ try
+ {
+ writer.close();
+ }
+ catch (IOException e)
+ {
+ LOGGER.error("Failed to close stream for file " + outputFile, e);
+ }
+ }
+ }
+ }
+
+ private String getOutputFile()
+ {
+ return String.valueOf(getCliOptions().get(ControllerRunner.OUTPUT_FILE_PROP));
+ }
+
+ private Config getTestConfig()
+ {
+ ConfigReader configReader = new ConfigReader();
+ Config testConfig;
+ try
+ {
+ testConfig = configReader.getConfigFromFile(getCliOptions().get(ControllerRunner.TEST_CONFIG_PROP));
+ }
+ catch (FileNotFoundException e)
+ {
+ throw new DistributedTestException("Exception while loading test config", e);
+ }
+ return testConfig;
+ }
+
+ private boolean isDistributed()
+ {
+ return Boolean.valueOf(getCliOptions().get(ControllerRunner.DISTRIBUTED_PROP));
+ }
+
+
+}
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/DistributedTestConstants.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/DistributedTestConstants.java
new file mode 100644
index 0000000000..c4892edca9
--- /dev/null
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/DistributedTestConstants.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.disttest;
+
+public abstract class DistributedTestConstants
+{
+ public static final String PERF_TEST_PROPERTIES_FILE = "perftests.properties";
+
+ public static final String MSG_COMMAND_PROPERTY = "COMMAND";
+ public static final String MSG_JSON_PROPERTY = "JSON";
+
+ public static final long REGISTRATION_TIMEOUT = 60000;
+ public static final long COMMAND_RESPONSE_TIMEOUT = 10000;
+
+ public static final String CONTROLLER_QUEUE_JNDI_NAME = "controllerqueue";
+}
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/DistributedTestException.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/DistributedTestException.java
new file mode 100644
index 0000000000..d1d24dcfff
--- /dev/null
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/DistributedTestException.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.disttest;
+
+import org.apache.qpid.disttest.message.Command;
+
+public class DistributedTestException extends RuntimeException
+{
+ private static final long serialVersionUID = 1L;
+ private final Command causeCommand;
+
+ public DistributedTestException(final String message)
+ {
+ this(message, (Command) null);
+ }
+
+ public DistributedTestException(final Throwable cause)
+ {
+ this(cause, null);
+ }
+
+ public DistributedTestException(final String message, final Throwable cause)
+ {
+ this(message, cause, null);
+ }
+
+ public DistributedTestException(final String message, final Command commandCause)
+ {
+ super(message);
+ causeCommand = commandCause;
+ }
+
+ public DistributedTestException(final Throwable cause, final Command commandCause)
+ {
+ super(cause);
+ causeCommand = commandCause;
+ }
+
+ public DistributedTestException(final String message, final Throwable cause, final Command commandCause)
+ {
+ super(message, cause);
+ causeCommand = commandCause;
+ }
+
+ public Command getCauseCommand()
+ {
+ return causeCommand;
+ }
+}
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/Visitor.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/Visitor.java
new file mode 100644
index 0000000000..52dad9aa4d
--- /dev/null
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/Visitor.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.disttest;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+
+/**
+ * A variation of the visitor pattern that uses reflection to call the correct
+ * visit method. By convention, subclasses should provide public
+ * <pre>visit(SpecificClass)</pre> methods.
+ */
+public abstract class Visitor
+{
+
+ private static final String VISITOR_METHOD_NAME = "visit";
+
+ public void visit(Object targetObject)
+ {
+ Class<? extends Object> targetObjectClass = targetObject.getClass();
+ final Method method = findVisitMethodForTargetObjectClass(targetObjectClass);
+ invokeVisitMethod(targetObject, method);
+ }
+
+ private Method findVisitMethodForTargetObjectClass(
+ Class<? extends Object> targetObjectClass)
+ {
+ final Method method;
+ try
+ {
+ method = getClass().getDeclaredMethod(VISITOR_METHOD_NAME, targetObjectClass);
+ }
+ catch (Exception e)
+ {
+ throw new DistributedTestException("Failed to find method " + VISITOR_METHOD_NAME + " on object of class " + targetObjectClass, e);
+ }
+ return method;
+ }
+
+ private void invokeVisitMethod(Object targetObject, final Method method)
+ {
+ try
+ {
+ method.invoke(this, targetObject);
+ }
+ catch (IllegalArgumentException e)
+ {
+ throw new DistributedTestException(e);
+ }
+ catch (IllegalAccessException e)
+ {
+ throw new DistributedTestException(e);
+ }
+ catch (InvocationTargetException e)
+ {
+ throw new DistributedTestException(e.getCause());
+ }
+ }
+}
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/Client.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/Client.java
new file mode 100644
index 0000000000..ed29182c51
--- /dev/null
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/Client.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.disttest.client;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import javax.naming.NamingException;
+
+import org.apache.commons.lang.builder.ToStringBuilder;
+import org.apache.commons.lang.builder.ToStringStyle;
+import org.apache.qpid.disttest.DistributedTestException;
+import org.apache.qpid.disttest.Visitor;
+import org.apache.qpid.disttest.jms.ClientJmsDelegate;
+import org.apache.qpid.disttest.message.Command;
+import org.apache.qpid.disttest.message.ParticipantResult;
+import org.apache.qpid.disttest.message.Response;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class Client
+{
+ private static final Logger LOGGER = LoggerFactory.getLogger(Client.class);
+
+ private final ClientJmsDelegate _clientJmsDelegate;
+
+ private final CountDownLatch _latch = new CountDownLatch(1);
+ private Visitor _visitor;
+ private final AtomicReference<ClientState> _state;
+ private ParticipantExecutorRegistry _participantRegistry = new ParticipantExecutorRegistry();
+
+ public Client(final ClientJmsDelegate delegate) throws NamingException
+ {
+ _clientJmsDelegate = delegate;
+ _state = new AtomicReference<ClientState>(ClientState.CREATED);
+ _visitor = new ClientCommandVisitor(this, _clientJmsDelegate);
+ }
+
+ /**
+ * Register with the controller
+ */
+ public void start()
+ {
+ _clientJmsDelegate.setInstructionListener(this);
+ _clientJmsDelegate.sendRegistrationMessage();
+ _state.set(ClientState.READY);
+ }
+
+ public void stop()
+ {
+ _state.set(ClientState.STOPPED);
+ _latch.countDown();
+ }
+
+ public void addParticipantExecutor(final ParticipantExecutor participant)
+ {
+ _participantRegistry.add(participant);
+ }
+
+ public void waitUntilStopped()
+ {
+ waitUntilStopped(0);
+ }
+
+ public void waitUntilStopped(final long timeout)
+ {
+ try
+ {
+ if (timeout == 0)
+ {
+ _latch.await();
+ }
+ else
+ {
+ _latch.await(timeout, TimeUnit.MILLISECONDS);
+ }
+ }
+ catch (final InterruptedException ie)
+ {
+ Thread.currentThread().interrupt();
+ }
+
+ _clientJmsDelegate.destroy();
+ }
+
+ public void processInstruction(final Command command)
+ {
+ String responseMessage = null;
+ try
+ {
+ command.accept(_visitor);
+ }
+ catch (final Exception e)
+ {
+ LOGGER.error("Error processing instruction", e);
+ responseMessage = e.getMessage();
+ }
+ finally
+ {
+ _clientJmsDelegate.sendResponseMessage(new Response(_clientJmsDelegate.getClientName(), command.getType(), responseMessage));
+ }
+ }
+
+ public ClientState getState()
+ {
+ return _state.get();
+ }
+
+ public String getClientName()
+ {
+ return _clientJmsDelegate.getClientName();
+ }
+
+ public void setClientCommandVisitor(final ClientCommandVisitor visitor)
+ {
+ _visitor = visitor;
+ }
+
+ public void startTest()
+ {
+ if (_state.compareAndSet(ClientState.READY, ClientState.RUNNING_TEST))
+ {
+ try
+ {
+ _clientJmsDelegate.startConnections();
+ for (final ParticipantExecutor executor : _participantRegistry.executors())
+ {
+ executor.start(this);
+ }
+ }
+ catch (final Exception e)
+ {
+ try
+ {
+ tearDownTest();
+ }
+ catch (final Exception e2)
+ {
+ // ignore
+ }
+ throw new DistributedTestException("Error starting test: " + _clientJmsDelegate.getClientName(), e);
+ }
+ }
+ else
+ {
+ throw new DistributedTestException("Client '" + _clientJmsDelegate.getClientName()
+ + "' is not in READY state:" + _state.get());
+ }
+ }
+
+ public void tearDownTest()
+ {
+ if (_state.compareAndSet(ClientState.RUNNING_TEST, ClientState.READY))
+ {
+ LOGGER.info("Tearing down test on client: " + _clientJmsDelegate.getClientName());
+
+ _clientJmsDelegate.closeTestConnections();
+ }
+ else
+ {
+ throw new DistributedTestException("Client '" + _clientJmsDelegate.getClientName() + "' is not in RUNNING_TEST state! Ignoring tearDownTest");
+ }
+
+
+ _participantRegistry.clear();
+ }
+
+ public void sendResults(ParticipantResult testResult)
+ {
+ _clientJmsDelegate.sendResponseMessage(testResult);
+ LOGGER.info("Sent test results " + testResult);
+ }
+
+ @Override
+ public String toString()
+ {
+ return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE)
+ .append("clientJmsDelegate", _clientJmsDelegate).toString();
+ }
+
+ void setParticipantRegistry(ParticipantExecutorRegistry participantRegistry)
+ {
+ _participantRegistry = participantRegistry;
+ }
+
+}
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/ClientCommandVisitor.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/ClientCommandVisitor.java
new file mode 100644
index 0000000000..791897323e
--- /dev/null
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/ClientCommandVisitor.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.disttest.client;
+
+import org.apache.qpid.disttest.Visitor;
+import org.apache.qpid.disttest.jms.ClientJmsDelegate;
+import org.apache.qpid.disttest.message.CreateConnectionCommand;
+import org.apache.qpid.disttest.message.CreateConsumerCommand;
+import org.apache.qpid.disttest.message.CreateMessageProviderCommand;
+import org.apache.qpid.disttest.message.CreateProducerCommand;
+import org.apache.qpid.disttest.message.CreateSessionCommand;
+import org.apache.qpid.disttest.message.NoOpCommand;
+import org.apache.qpid.disttest.message.StartTestCommand;
+import org.apache.qpid.disttest.message.StopClientCommand;
+import org.apache.qpid.disttest.message.TearDownTestCommand;
+
+public class ClientCommandVisitor extends Visitor
+{
+ private final Client _client;
+ private final ClientJmsDelegate _clientJmsDelegate;
+
+ public ClientCommandVisitor(final Client client, final ClientJmsDelegate clientJmsDelegate)
+ {
+ super();
+ _client = client;
+ _clientJmsDelegate = clientJmsDelegate;
+ }
+
+ public void visit(final StopClientCommand command)
+ {
+ _client.stop();
+ }
+
+ public void visit(final NoOpCommand command)
+ {
+ // no-op
+ }
+
+ public void visit(final CreateConnectionCommand command)
+ {
+ _clientJmsDelegate.createConnection(command);
+ }
+
+ public void visit(final CreateSessionCommand command)
+ {
+ _clientJmsDelegate.createSession(command);
+ }
+
+ public void visit(final CreateProducerCommand command)
+ {
+
+ final ProducerParticipant participant = new ProducerParticipant(_clientJmsDelegate, command);
+ _clientJmsDelegate.createProducer(command);
+ final ParticipantExecutor executor = new ParticipantExecutor(participant);
+ _client.addParticipantExecutor(executor);
+ }
+
+ public void visit(final CreateConsumerCommand command)
+ {
+ final ConsumerParticipant participant = new ConsumerParticipant(_clientJmsDelegate, command);
+ _clientJmsDelegate.createConsumer(command);
+ final ParticipantExecutor executor = new ParticipantExecutor(participant);
+ _client.addParticipantExecutor(executor);
+ }
+
+ public void visit(final StartTestCommand command)
+ {
+ _client.startTest();
+ }
+
+ public void visit(final TearDownTestCommand command)
+ {
+ _client.tearDownTest();
+ }
+
+ public void visit(final CreateMessageProviderCommand command)
+ {
+ _clientJmsDelegate.createMessageProvider(command);
+ }
+
+}
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/ClientState.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/ClientState.java
new file mode 100644
index 0000000000..c88c0a6c86
--- /dev/null
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/ClientState.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.disttest.client;
+
+public enum ClientState
+{
+ CREATED, READY, STOPPED, RUNNING_TEST;
+}
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/ConsumerParticipant.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/ConsumerParticipant.java
new file mode 100644
index 0000000000..89400b1f72
--- /dev/null
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/ConsumerParticipant.java
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.disttest.client;
+
+
+import java.util.Date;
+import java.util.NavigableSet;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.jms.Message;
+import javax.jms.MessageListener;
+
+import org.apache.qpid.disttest.DistributedTestException;
+import org.apache.qpid.disttest.jms.ClientJmsDelegate;
+import org.apache.qpid.disttest.message.ConsumerParticipantResult;
+import org.apache.qpid.disttest.message.CreateConsumerCommand;
+import org.apache.qpid.disttest.message.ParticipantResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ConsumerParticipant implements Participant
+{
+ private static final Logger LOGGER = LoggerFactory.getLogger(ConsumerParticipant.class);
+
+ private final AtomicInteger _totalNumberOfMessagesReceived = new AtomicInteger(0);
+ private final NavigableSet<Integer> _allConsumedPayloadSizes = new ConcurrentSkipListSet<Integer>();
+ private final AtomicLong _totalPayloadSizeOfAllMessagesReceived = new AtomicLong(0);
+ private final CountDownLatch _asyncRunHasFinished = new CountDownLatch(1);
+ private final ClientJmsDelegate _jmsDelegate;
+ private final CreateConsumerCommand _command;
+ private final ParticipantResultFactory _resultFactory;
+
+ private long _startTime;
+
+ private volatile Exception _asyncMessageListenerException;
+
+ public ConsumerParticipant(final ClientJmsDelegate delegate, final CreateConsumerCommand command)
+ {
+ _jmsDelegate = delegate;
+ _command = command;
+ _resultFactory = new ParticipantResultFactory();
+ }
+
+ @Override
+ public ParticipantResult doIt(String registeredClientName) throws Exception
+ {
+ final Date start = new Date();
+
+ if (_command.getMaximumDuration() == 0 && _command.getNumberOfMessages() == 0)
+ {
+ throw new DistributedTestException("number of messages and duration cannot both be zero");
+ }
+
+ if (_command.isSynchronous())
+ {
+ synchronousRun();
+ }
+ else
+ {
+ _jmsDelegate.registerListener(_command.getParticipantName(), new MessageListener(){
+
+ @Override
+ public void onMessage(Message message)
+ {
+ processAsynchMessage(message);
+ }
+
+ });
+
+ waitUntilMsgListenerHasFinished();
+ rethrowAnyAsyncMessageListenerException();
+ }
+
+ Date end = new Date();
+ int numberOfMessagesSent = _totalNumberOfMessagesReceived.get();
+ long totalPayloadSize = _totalPayloadSizeOfAllMessagesReceived.get();
+ int payloadSize = getPayloadSizeForResultIfConstantOrZeroOtherwise(_allConsumedPayloadSizes);
+
+ ConsumerParticipantResult result = _resultFactory.createForConsumer(
+ getName(),
+ registeredClientName,
+ _command,
+ numberOfMessagesSent,
+ payloadSize,
+ totalPayloadSize,
+ start,
+ end);
+
+ return result;
+ }
+
+ private void synchronousRun()
+ {
+ LOGGER.debug("entered synchronousRun: " + this);
+
+ _startTime = System.currentTimeMillis();
+
+ Message message = null;
+
+ do
+ {
+ message = _jmsDelegate.consumeMessage(_command.getParticipantName(),
+ _command.getReceiveTimeout());
+ } while (processMessage(message));
+ }
+
+ /**
+ * @return whether to continue running (ie returns false if the message quota has been reached)
+ */
+ private boolean processMessage(Message message)
+ {
+ int messageCount = _totalNumberOfMessagesReceived.incrementAndGet();
+ if (LOGGER.isTraceEnabled())
+ {
+ LOGGER.trace("message " + messageCount + " received by " + this);
+ }
+ int messagePayloadSize = _jmsDelegate.calculatePayloadSizeFrom(message);
+ _allConsumedPayloadSizes.add(messagePayloadSize);
+ _totalPayloadSizeOfAllMessagesReceived.addAndGet(messagePayloadSize);
+
+ boolean batchEnabled = _command.getBatchSize() > 0;
+ boolean batchComplete = batchEnabled && messageCount % _command.getBatchSize() == 0;
+
+ if (!batchEnabled || batchComplete)
+ {
+ _jmsDelegate.commitOrAcknowledgeMessage(message, _command.getSessionName());
+ }
+
+ boolean reachedExpectedNumberOfMessages = _command.getNumberOfMessages() > 0 && messageCount >= _command.getNumberOfMessages();
+ boolean reachedMaximumDuration = _command.getMaximumDuration() > 0 && System.currentTimeMillis() - _startTime >= _command.getMaximumDuration();
+ boolean finishedConsuming = reachedExpectedNumberOfMessages || reachedMaximumDuration;
+
+ if (finishedConsuming)
+ {
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("message " + messageCount
+ + " reachedExpectedNumberOfMessages " + reachedExpectedNumberOfMessages
+ + " reachedMaximumDuration " + reachedMaximumDuration);
+ }
+
+ if (batchEnabled && !batchComplete)
+ {
+ // commit/acknowledge remaining messages if necessary
+ _jmsDelegate.commitOrAcknowledgeMessage(message, _command.getSessionName());
+ }
+ return false;
+ }
+
+ return true;
+ }
+
+
+ /**
+ * Intended to be called from a {@link MessageListener}. Updates {@link #_asyncRunHasFinished} if
+ * no more messages should be processed, causing {@link #doIt(String)} to exit.
+ */
+ public void processAsynchMessage(Message message)
+ {
+ boolean continueRunning = true;
+ try
+ {
+ if (_startTime == 0)
+ {
+ // reset counter and start time on receiving of first message
+ _startTime = System.currentTimeMillis();
+ }
+
+ continueRunning = processMessage(message);
+ }
+ catch (Exception e)
+ {
+ LOGGER.error("Error occured consuming message " + _totalNumberOfMessagesReceived, e);
+ continueRunning = false;
+ _asyncMessageListenerException = e;
+ }
+
+ if(!continueRunning)
+ {
+ _asyncRunHasFinished.countDown();
+ }
+ }
+
+ @Override
+ public void releaseResources()
+ {
+ _jmsDelegate.closeTestConsumer(_command.getParticipantName());
+ }
+
+ private int getPayloadSizeForResultIfConstantOrZeroOtherwise(NavigableSet<Integer> allSizes)
+ {
+ return allSizes.size() == 1 ? _allConsumedPayloadSizes.first() : 0;
+ }
+
+ private void rethrowAnyAsyncMessageListenerException()
+ {
+ if (_asyncMessageListenerException != null)
+ {
+ throw new DistributedTestException(_asyncMessageListenerException);
+ }
+ }
+
+ private void waitUntilMsgListenerHasFinished() throws Exception
+ {
+ LOGGER.debug("waiting until message listener has finished for " + this);
+ _asyncRunHasFinished.await();
+ LOGGER.debug("Message listener has finished for " + this);
+ }
+
+ @Override
+ public String getName()
+ {
+ return _command.getParticipantName();
+ }
+}
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/MessageProvider.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/MessageProvider.java
new file mode 100644
index 0000000000..2dcf8940b6
--- /dev/null
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/MessageProvider.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.disttest.client;
+
+import java.lang.reflect.InvocationTargetException;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.commons.beanutils.BeanUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.qpid.disttest.DistributedTestException;
+import org.apache.qpid.disttest.client.property.PropertyValue;
+import org.apache.qpid.disttest.message.CreateProducerCommand;
+
+public class MessageProvider
+{
+ public static final String TTL = "ttl";
+
+ public static final String DELIVERY_MODE = "deliveryMode";
+
+ public static final String PRIORITY = "priority";
+
+ public static final String[] STANDARD_JMS_PROPERTIES = { "correlationID", DELIVERY_MODE,
+ "expiration", "messageID", PRIORITY, "redelivered", "replyTo", "timestamp", "type", TTL };
+
+ private Map<String, PropertyValue> _messageProperties;
+ private ConcurrentMap<Integer, Future<String>> _payloads;
+
+ public MessageProvider(Map<String, PropertyValue> messageProperties)
+ {
+ _messageProperties = messageProperties;
+ _payloads = new ConcurrentHashMap<Integer, Future<String>>();
+ }
+
+ public Message nextMessage(Session session, CreateProducerCommand command) throws JMSException
+ {
+ Message message = createTextMessage(session, command);
+ setMessageProperties(message);
+ return message;
+ }
+
+ public boolean isPropertySet(String name)
+ {
+ return _messageProperties != null && _messageProperties.containsKey(name);
+ }
+
+ public void setMessageProperties(Message message) throws JMSException
+ {
+ if (_messageProperties != null)
+ {
+ for (Entry<String, PropertyValue> entry : _messageProperties.entrySet())
+ {
+ String propertyName = entry.getKey();
+ Object propertyValue = entry.getValue().getValue();
+ if (isStandardProperty(propertyName))
+ {
+ setStandardProperty(message, propertyName, propertyValue);
+ }
+ else
+ {
+ setCustomProperty(message, propertyName, propertyValue);
+ }
+ }
+ }
+ }
+
+ protected void setCustomProperty(Message message, String propertyName, Object propertyValue) throws JMSException
+ {
+ if (propertyValue instanceof Integer)
+ {
+ message.setIntProperty(propertyName, ((Integer) propertyValue).intValue());
+ }
+ else if (propertyValue instanceof Long)
+ {
+ message.setLongProperty(propertyName, ((Long) propertyValue).longValue());
+ }
+ else if (propertyValue instanceof Boolean)
+ {
+ message.setBooleanProperty(propertyName, ((Boolean) propertyValue).booleanValue());
+ }
+ else if (propertyValue instanceof Byte)
+ {
+ message.setByteProperty(propertyName, ((Byte) propertyValue).byteValue());
+ }
+ else if (propertyValue instanceof Double)
+ {
+ message.setDoubleProperty(propertyName, ((Double) propertyValue).doubleValue());
+ }
+ else if (propertyValue instanceof Float)
+ {
+ message.setFloatProperty(propertyName, ((Float) propertyValue).floatValue());
+ }
+ else if (propertyValue instanceof Short)
+ {
+ message.setShortProperty(propertyName, ((Short) propertyValue).shortValue());
+ }
+ else if (propertyValue instanceof String)
+ {
+ message.setStringProperty(propertyName, (String) propertyValue);
+ }
+ else
+ {
+ message.setObjectProperty(propertyName, propertyValue);
+ }
+ }
+
+ protected void setStandardProperty(Message message, String property, Object propertyValue) throws JMSException
+ {
+ String propertyName = "JMS" + StringUtils.capitalize(property);
+ try
+ {
+ BeanUtils.setProperty(message, propertyName, propertyValue);
+ }
+ catch (IllegalAccessException e)
+ {
+ throw new DistributedTestException("Unable to set property " + propertyName + " :" + e.getMessage(), e);
+ }
+ catch (InvocationTargetException e)
+ {
+ if (e.getCause() instanceof JMSException)
+ {
+ throw ((JMSException) e.getCause());
+ }
+ else
+ {
+ throw new DistributedTestException("Unable to set property " + propertyName + " :" + e.getMessage(), e);
+ }
+ }
+ }
+
+ protected boolean isStandardProperty(String propertyName)
+ {
+ for (int i = 0; i < STANDARD_JMS_PROPERTIES.length; i++)
+ {
+ if (propertyName.equals(STANDARD_JMS_PROPERTIES[i]))
+ {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ protected Message createTextMessage(Session ssn, final CreateProducerCommand command) throws JMSException
+ {
+ String payload = getMessagePayload(command);
+ TextMessage msg = ssn.createTextMessage();
+ msg.setText(payload);
+ return msg;
+ }
+
+ protected String getMessagePayload(final CreateProducerCommand command)
+ {
+ FutureTask<String> createTextFuture = new FutureTask<String>(new Callable<String>()
+ {
+ @Override
+ public String call() throws Exception
+ {
+ return StringUtils.repeat("a", command.getMessageSize());
+ }
+ });
+
+ Future<String> future = _payloads.putIfAbsent(command.getMessageSize(), createTextFuture);
+ if (future == null)
+ {
+ createTextFuture.run();
+ future = createTextFuture;
+ }
+ String payload = null;
+ try
+ {
+ payload = future.get();
+ }
+ catch (Exception e)
+ {
+ throw new DistributedTestException("Unable to create message payload :" + e.getMessage(), e);
+ }
+ return payload;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "MessageProvider [_messageProperties=" + _messageProperties + "]";
+ }
+
+}
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/Participant.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/Participant.java
new file mode 100644
index 0000000000..941ec90565
--- /dev/null
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/Participant.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.disttest.client;
+
+import org.apache.qpid.disttest.message.ParticipantResult;
+
+public interface Participant
+{
+ ParticipantResult doIt(String registeredClientName) throws Exception;
+
+ void releaseResources();
+
+ String getName();
+
+}
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/ParticipantExecutor.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/ParticipantExecutor.java
new file mode 100644
index 0000000000..dee1391868
--- /dev/null
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/ParticipantExecutor.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.disttest.client;
+
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+
+import org.apache.commons.lang.builder.ToStringBuilder;
+import org.apache.commons.lang.builder.ToStringStyle;
+import org.apache.qpid.disttest.message.ParticipantResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ParticipantExecutor
+{
+ private static final Logger LOGGER = LoggerFactory.getLogger(ParticipantExecutor.class);
+
+ private static final ExecutorService SHARED_UNBOUNDED_THREAD_POOL = Executors.newCachedThreadPool(new DaemonThreadFactory());
+
+ private Executor _executor = SHARED_UNBOUNDED_THREAD_POOL;
+
+ private Client _client;
+
+ private final Participant _participant;
+
+ private final ParticipantResultFactory _factory;
+
+ public ParticipantExecutor(Participant participant)
+ {
+ _participant = participant;
+ _factory = new ParticipantResultFactory();
+ }
+
+ /**
+ * Schedules the test participant to be run in a background thread.
+ */
+ public void start(Client client)
+ {
+ _client = client;
+
+ LOGGER.info("Starting test participant in background thread: " + this);
+ _executor.execute(new ParticipantRunnable());
+ }
+
+ public String getParticipantName()
+ {
+ return _participant.getName();
+ }
+
+ void setExecutor(Executor executor)
+ {
+ _executor = executor;
+ }
+
+ private class ParticipantRunnable implements Runnable
+ {
+ @Override
+ public final void run()
+ {
+ Thread currentThread = Thread.currentThread();
+ final String initialThreadName = currentThread.getName();
+ currentThread.setName(initialThreadName + "-" + getParticipantName());
+
+ try
+ {
+ runParticipantAndSendResults();
+ }
+ finally
+ {
+ currentThread.setName(initialThreadName);
+ }
+ }
+
+ private void runParticipantAndSendResults()
+ {
+ ParticipantResult result = null;
+ try
+ {
+ result = _participant.doIt(_client.getClientName());
+ }
+ catch (Throwable t)
+ {
+ String errorMessage = "Unhandled error: " + t.getMessage();
+ LOGGER.error(errorMessage, t);
+ result = _factory.createForError(_participant.getName(), _client.getClientName(), errorMessage);
+ }
+ finally
+ {
+ _client.sendResults(result);
+ _participant.releaseResources();
+ }
+ }
+ }
+
+ private static final class DaemonThreadFactory implements ThreadFactory
+ {
+ @Override
+ public Thread newThread(Runnable r)
+ {
+ Thread thread = new Thread(r);
+ thread.setDaemon(true);
+ return thread;
+ }
+ }
+
+ @Override
+ public String toString()
+ {
+ return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE)
+ .append("participantName", getParticipantName())
+ .append("client", _client)
+ .toString();
+ }
+}
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/ParticipantExecutorRegistry.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/ParticipantExecutorRegistry.java
new file mode 100644
index 0000000000..3d9780e640
--- /dev/null
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/ParticipantExecutorRegistry.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.disttest.client;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+public class ParticipantExecutorRegistry
+{
+ private final Set<ParticipantExecutor> _participantExecutors = Collections.synchronizedSet(new HashSet<ParticipantExecutor>());
+
+ public void add(ParticipantExecutor participantExecutor)
+ {
+ _participantExecutors.add(participantExecutor);
+ }
+
+ public void clear()
+ {
+ _participantExecutors.clear();
+ }
+
+ public Collection<ParticipantExecutor> executors()
+ {
+ return Collections.unmodifiableSet(_participantExecutors);
+ }
+}
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/ParticipantResultFactory.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/ParticipantResultFactory.java
new file mode 100644
index 0000000000..61b64b8c4f
--- /dev/null
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/ParticipantResultFactory.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.disttest.client;
+
+import java.util.Date;
+
+import org.apache.qpid.disttest.message.ConsumerParticipantResult;
+import org.apache.qpid.disttest.message.CreateConsumerCommand;
+import org.apache.qpid.disttest.message.CreateParticpantCommand;
+import org.apache.qpid.disttest.message.CreateProducerCommand;
+import org.apache.qpid.disttest.message.ParticipantResult;
+import org.apache.qpid.disttest.message.ProducerParticipantResult;
+
+public class ParticipantResultFactory
+{
+ public ConsumerParticipantResult createForConsumer(String participantName, String clientRegisteredName, CreateConsumerCommand command, int numberOfMessagesReceived, int payloadSize, long totalPayloadReceived, Date start, Date end)
+ {
+ ConsumerParticipantResult consumerParticipantResult = new ConsumerParticipantResult();
+
+ setTestProperties(consumerParticipantResult, command, participantName, clientRegisteredName);
+ setTestResultProperties(consumerParticipantResult, numberOfMessagesReceived, payloadSize, totalPayloadReceived, start, end);
+
+ consumerParticipantResult.setTopic(command.isTopic());
+ consumerParticipantResult.setDurableSubscription(command.isDurableSubscription());
+ consumerParticipantResult.setBrowsingSubscription(command.isBrowsingSubscription());
+ consumerParticipantResult.setSelector(command.getSelector() != null);
+ consumerParticipantResult.setNoLocal(command.isNoLocal());
+ consumerParticipantResult.setSynchronousConsumer(command.isSynchronous());
+
+ return consumerParticipantResult;
+ }
+
+ public ProducerParticipantResult createForProducer(String participantName, String clientRegisteredName, CreateProducerCommand command, int numberOfMessagesSent, int payloadSize, long totalPayloadSent, Date start, Date end)
+ {
+ final ProducerParticipantResult participantResult = new ProducerParticipantResult();
+
+ participantResult.setStartDelay(command.getStartDelay());
+ participantResult.setDeliveryMode(command.getDeliveryMode());
+ participantResult.setPriority(command.getPriority());
+ participantResult.setInterval(command.getInterval());
+ participantResult.setTimeToLive(command.getTimeToLive());
+
+ setTestProperties(participantResult, command, participantName, clientRegisteredName);
+
+ setTestResultProperties(participantResult, numberOfMessagesSent, payloadSize, totalPayloadSent, start, end);
+
+ return participantResult;
+ }
+
+ private void setTestResultProperties(final ParticipantResult participantResult, int numberOfMessagesSent, int payloadSize, long totalPayloadReceived, Date start, Date end)
+ {
+ participantResult.setNumberOfMessagesProcessed(numberOfMessagesSent);
+ participantResult.setPayloadSize(payloadSize);
+ participantResult.setTotalPayloadProcessed(totalPayloadReceived);
+ participantResult.setStartDate(start);
+ participantResult.setEndDate(end);
+ }
+
+ private void setTestProperties(final ParticipantResult participantResult, CreateParticpantCommand command, String participantName, String clientRegisteredName)
+ {
+ participantResult.setParticipantName(participantName);
+ participantResult.setRegisteredClientName(clientRegisteredName);
+ participantResult.setBatchSize(command.getBatchSize());
+ participantResult.setMaximumDuration(command.getMaximumDuration());
+
+ }
+
+ public ParticipantResult createForError(String participantName, String clientRegisteredName, String errorMessage)
+ {
+ ParticipantResult result = new ParticipantResult();
+ result.setParticipantName(participantName);
+ result.setRegisteredClientName(clientRegisteredName);
+ result.setErrorMessage(errorMessage);
+
+ return result;
+ }
+
+}
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/ProducerParticipant.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/ProducerParticipant.java
new file mode 100644
index 0000000000..a46794f380
--- /dev/null
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/ProducerParticipant.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.disttest.client;
+
+import java.util.Date;
+import java.util.NavigableSet;
+import java.util.TreeSet;
+
+import javax.jms.Message;
+
+import org.apache.qpid.disttest.DistributedTestException;
+import org.apache.qpid.disttest.jms.ClientJmsDelegate;
+import org.apache.qpid.disttest.message.CreateProducerCommand;
+import org.apache.qpid.disttest.message.ParticipantResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ProducerParticipant implements Participant
+{
+ private static final Logger LOGGER = LoggerFactory.getLogger(ProducerParticipant.class);
+
+ private final ClientJmsDelegate _jmsDelegate;
+
+ private final CreateProducerCommand _command;
+
+ private ParticipantResultFactory _resultFactory;
+
+ public ProducerParticipant(final ClientJmsDelegate jmsDelegate, final CreateProducerCommand command)
+ {
+ _jmsDelegate = jmsDelegate;
+ _command = command;
+ _resultFactory = new ParticipantResultFactory();
+ }
+
+ @Override
+ public ParticipantResult doIt(String registeredClientName) throws Exception
+ {
+ if (_command.getMaximumDuration() == 0 && _command.getNumberOfMessages() == 0)
+ {
+ throw new DistributedTestException("number of messages and duration cannot both be zero");
+ }
+
+ long expectedDuration = _command.getMaximumDuration() - _command.getStartDelay();
+
+ doSleepForStartDelay();
+
+ final long startTime = System.currentTimeMillis();
+
+ Message lastPublishedMessage = null;
+ int numberOfMessagesSent = 0;
+ long totalPayloadSizeOfAllMessagesSent = 0;
+ NavigableSet<Integer> allProducedPayloadSizes = new TreeSet<Integer>();
+
+ while (true)
+ {
+ numberOfMessagesSent++;
+
+ lastPublishedMessage = _jmsDelegate.sendNextMessage(_command);
+
+ int lastPayloadSize = _jmsDelegate.calculatePayloadSizeFrom(lastPublishedMessage);
+ totalPayloadSizeOfAllMessagesSent += lastPayloadSize;
+ allProducedPayloadSizes.add(lastPayloadSize);
+
+ if (LOGGER.isTraceEnabled())
+ {
+ LOGGER.trace("message " + numberOfMessagesSent + " sent by " + this);
+ }
+
+ final boolean batchLimitReached = _command.getBatchSize() <= 0
+ || numberOfMessagesSent % _command.getBatchSize() == 0;
+
+ if (batchLimitReached)
+ {
+ _jmsDelegate.commitOrAcknowledgeMessage(lastPublishedMessage, _command.getSessionName());
+
+ if (_command.getInterval() > 0)
+ {
+ // sleep for given time
+ Thread.sleep(_command.getInterval());
+ }
+ }
+
+ if (_command.getNumberOfMessages() > 0 && numberOfMessagesSent >= _command.getNumberOfMessages()
+ || expectedDuration > 0 && System.currentTimeMillis() - startTime >= expectedDuration)
+ {
+ break;
+ }
+ }
+
+ // commit the remaining batch messages
+ if (_command.getBatchSize() > 0 && numberOfMessagesSent % _command.getBatchSize() != 0)
+ {
+ _jmsDelegate.commitOrAcknowledgeMessage(lastPublishedMessage, _command.getSessionName());
+ }
+
+ Date start = new Date(startTime);
+ Date end = new Date();
+ int payloadSize = getPayloadSizeForResultIfConstantOrZeroOtherwise(allProducedPayloadSizes);
+
+ return _resultFactory.createForProducer(
+ getName(),
+ registeredClientName,
+ _command,
+ numberOfMessagesSent,
+ payloadSize,
+ totalPayloadSizeOfAllMessagesSent, start, end);
+ }
+
+ private int getPayloadSizeForResultIfConstantOrZeroOtherwise(NavigableSet<Integer> allPayloadSizes)
+ {
+ return allPayloadSizes.size() == 1 ? allPayloadSizes.first() : 0;
+ }
+
+ private void doSleepForStartDelay()
+ {
+ if (_command.getStartDelay() > 0)
+ {
+ // start delay is specified. Sleeping...
+ try
+ {
+ Thread.sleep(_command.getStartDelay());
+ }
+ catch (final InterruptedException e)
+ {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+
+ @Override
+ public void releaseResources()
+ {
+ _jmsDelegate.closeTestProducer(_command.getParticipantName());
+ }
+
+ @Override
+ public String getName()
+ {
+ return _command.getParticipantName();
+ }
+}
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/property/GeneratedPropertySupport.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/property/GeneratedPropertySupport.java
new file mode 100644
index 0000000000..a49ebf756e
--- /dev/null
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/property/GeneratedPropertySupport.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.disttest.client.property;
+
+/**
+ * Provides support to generate message property values.
+ */
+public abstract class GeneratedPropertySupport implements GeneratedPropertyValue
+{
+ private Object _lastValue;
+
+ public GeneratedPropertySupport()
+ {
+ super();
+ _lastValue = null;
+ }
+
+ @Override
+ public Object getValue()
+ {
+ Object result = nextValue();
+ result = evaluate(result);
+ synchronized(this)
+ {
+ _lastValue = result;
+ }
+ return result;
+ }
+
+ protected Object evaluate(Object result)
+ {
+ while (result instanceof PropertyValue)
+ {
+ result = ((PropertyValue)result).getValue();
+ }
+ return result;
+ }
+
+ public abstract Object nextValue();
+
+ public synchronized Object getLastValue()
+ {
+ return _lastValue;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "GeneratedPropertyValue [value=" + getLastValue() + ", @def=" + getDefinition() + "]";
+ }
+
+}
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/property/GeneratedPropertyValue.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/property/GeneratedPropertyValue.java
new file mode 100644
index 0000000000..39c093fac5
--- /dev/null
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/property/GeneratedPropertyValue.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.disttest.client.property;
+
+/**
+ * Provides operations to generate message property values.
+ */
+public interface GeneratedPropertyValue extends PropertyValue
+{
+ public String getDefinition();
+}
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/property/ListPropertyValue.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/property/ListPropertyValue.java
new file mode 100644
index 0000000000..4444351976
--- /dev/null
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/property/ListPropertyValue.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.disttest.client.property;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Provides property values from the underlining list of items.
+ */
+public class ListPropertyValue extends GeneratedPropertySupport
+{
+ public static final String DEF_VALUE = "list";
+ private List<PropertyValue> _items;
+ private boolean _cyclic;
+ private int _currentIndex;
+
+ public ListPropertyValue()
+ {
+ super();
+ _cyclic = true;
+ _currentIndex = 0;
+ _items = new ArrayList<PropertyValue>();
+ }
+
+ public synchronized void setItems(List<PropertyValue> items)
+ {
+ _items = new ArrayList<PropertyValue>(items);
+ }
+
+ public synchronized List<PropertyValue> getItems()
+ {
+ return Collections.unmodifiableList(_items);
+ }
+
+ public synchronized void setCyclic(boolean cyclic)
+ {
+ _cyclic = cyclic;
+ }
+
+ public synchronized boolean isCyclic()
+ {
+ return _cyclic;
+ }
+
+ @Override
+ public synchronized Object nextValue()
+ {
+ if (_currentIndex >= _items.size())
+ {
+ if (_cyclic)
+ {
+ _currentIndex = 0;
+ }
+ else
+ {
+ _currentIndex = _items.size() -1;
+ }
+ }
+ Object nextValue = _items.get(_currentIndex);
+ _currentIndex++;
+ return nextValue;
+ }
+
+ @Override
+ public String getDefinition()
+ {
+ return DEF_VALUE;
+ }
+
+ @Override
+ public int hashCode()
+ {
+ final int prime = 31;
+ int result = super.hashCode();
+ result = prime * result + _currentIndex;
+ result = prime * result + (_cyclic ? 1231 : 1237);
+ result = prime * result + ((_items == null) ? 0 : _items.hashCode());
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj)
+ {
+ if (this == obj)
+ {
+ return true;
+ }
+ if (obj == null || !(obj instanceof ListPropertyValue))
+ {
+ return false;
+ }
+ ListPropertyValue other = (ListPropertyValue) obj;
+ if (_cyclic != other._cyclic)
+ {
+ return false;
+ }
+ if (_items == null && other._items != null)
+ {
+ return false;
+ }
+ return _items.equals(other._items);
+ }
+
+}
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/property/NumericGeneratedPropertySupport.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/property/NumericGeneratedPropertySupport.java
new file mode 100644
index 0000000000..e0ae137c35
--- /dev/null
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/property/NumericGeneratedPropertySupport.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.disttest.client.property;
+
+import java.util.Arrays;
+
+/**
+ * Provides support for numeric generators with lower and upper boundaries.
+ */
+public abstract class NumericGeneratedPropertySupport extends GeneratedPropertySupport
+{
+ public static final Class<?>[] SUPPORTED_TYPES = { double.class, float.class, int.class, long.class, short.class,
+ byte.class };
+
+ private String _type;
+ private double _upper;
+ private double _lower;
+
+
+ public NumericGeneratedPropertySupport()
+ {
+ super();
+ _type = SUPPORTED_TYPES[0].getName();
+ _upper = Double.MAX_VALUE;
+ _lower = 0.0;
+ }
+
+ public synchronized String getType()
+ {
+ return _type;
+ }
+
+ public synchronized double getUpper()
+ {
+ return _upper;
+ }
+
+ public synchronized double getLower()
+ {
+ return _lower;
+ }
+
+ public synchronized void setUpper(double upper)
+ {
+ _upper = upper;
+ }
+
+ public synchronized void setLower(double lower)
+ {
+ _lower = lower;
+ }
+
+ public synchronized void setType(String type)
+ {
+ _type = toClass(type).getName();
+ }
+
+ protected Class<?> toClass(String type)
+ {
+ Class<?> t = null;
+ for (int i = 0; i < SUPPORTED_TYPES.length; i++)
+ {
+ if (SUPPORTED_TYPES[i].getName().equals(type))
+ {
+ t = SUPPORTED_TYPES[i];
+ break;
+ }
+ }
+ if (t == null)
+ {
+ throw new IllegalArgumentException("Type " + type + " is not supported: "
+ + Arrays.toString(SUPPORTED_TYPES));
+ }
+ return t;
+ }
+
+ @Override
+ public Object nextValue()
+ {
+ double result = nextDouble();
+ return doubleToNumber(result, toClass(_type));
+ }
+
+ protected Number doubleToNumber(double value, Class<?> targetType)
+ {
+ Number result = null;
+ if (targetType == double.class)
+ {
+ result = new Double(value);
+ }
+ else if (targetType == float.class)
+ {
+ result = new Float(value);
+ }
+ else if (targetType == int.class)
+ {
+ result = new Integer((int) value);
+ }
+ else if (targetType == long.class)
+ {
+ result = new Long((long) value);
+ }
+ else if (targetType == short.class)
+ {
+ result = new Short((short) value);
+ }
+ else if (targetType == byte.class)
+ {
+ result = new Byte((byte) value);
+ }
+ else
+ {
+ throw new IllegalArgumentException("Type " + targetType + " is not supported: "
+ + Arrays.toString(SUPPORTED_TYPES));
+ }
+ return result;
+ }
+
+ protected abstract double nextDouble();
+
+ @Override
+ public int hashCode()
+ {
+ final int prime = 31;
+ int result = super.hashCode();
+ long temp;
+ temp = Double.doubleToLongBits(_lower);
+ result = prime * result + (int) (temp ^ (temp >>> 32));
+ result = prime * result + ((_type == null) ? 0 : _type.hashCode());
+ temp = Double.doubleToLongBits(_upper);
+ result = prime * result + (int) (temp ^ (temp >>> 32));
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj)
+ {
+ if (this == obj)
+ {
+ return true;
+ }
+ if (obj == null || !(obj instanceof NumericGeneratedPropertySupport))
+ {
+ return false;
+ }
+ NumericGeneratedPropertySupport other = (NumericGeneratedPropertySupport) obj;
+ if (Double.doubleToLongBits(_lower) != Double.doubleToLongBits(other._lower)
+ || Double.doubleToLongBits(_upper) != Double.doubleToLongBits(other._upper))
+ {
+ return false;
+ }
+ if (_type == null && other._type != null)
+ {
+ return false;
+ }
+ else if (!_type.equals(other._type))
+ {
+ return false;
+ }
+ return true;
+ }
+
+}
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/property/PropertyValue.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/property/PropertyValue.java
new file mode 100644
index 0000000000..97adc0cee1
--- /dev/null
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/property/PropertyValue.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.disttest.client.property;
+
+/**
+ * Provides operations to get a message property value.
+ */
+public interface PropertyValue
+{
+ public Object getValue();
+}
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/property/PropertyValueFactory.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/property/PropertyValueFactory.java
new file mode 100644
index 0000000000..fa44b2da1e
--- /dev/null
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/property/PropertyValueFactory.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.disttest.client.property;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.qpid.disttest.DistributedTestException;
+
+/**
+ * Creates property value instances using given alias (type) value.
+ */
+public class PropertyValueFactory
+{
+ public PropertyValue createPropertyValue(String type)
+ {
+ try
+ {
+ return (PropertyValue)getPropertyValueClass(type).newInstance();
+ }
+ catch(Exception e)
+ {
+ throw new DistributedTestException("Unable to create a generator for a type:" + type, e);
+ }
+ }
+
+ public Class<?> getPropertyValueClass(String type) throws ClassNotFoundException
+ {
+ String className = "org.apache.qpid.disttest.client.property." + StringUtils.capitalize(type) + "PropertyValue";
+ return Class.forName(className);
+ }
+}
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/property/RandomPropertyValue.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/property/RandomPropertyValue.java
new file mode 100644
index 0000000000..4f44a4bca8
--- /dev/null
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/property/RandomPropertyValue.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.disttest.client.property;
+
+/**
+ * Generates random property values in a given lower and upper boundaries.
+ */
+public class RandomPropertyValue extends NumericGeneratedPropertySupport
+{
+ public static final String DEF_VALUE = "random";
+
+ public RandomPropertyValue()
+ {
+ super();
+ }
+
+ @Override
+ protected double nextDouble()
+ {
+ double lower = getLower();
+ double upper = getUpper();
+ return lower + Math.random() * (upper - lower);
+ }
+
+ @Override
+ public String getDefinition()
+ {
+ return DEF_VALUE;
+ }
+
+}
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/property/RangePropertyValue.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/property/RangePropertyValue.java
new file mode 100644
index 0000000000..3aca4d4bca
--- /dev/null
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/property/RangePropertyValue.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.disttest.client.property;
+
+/**
+ * Generates property values from a range with given lower and upper boundaries.
+ */
+public class RangePropertyValue extends NumericGeneratedPropertySupport
+{
+ public static final String DEF_VALUE = "range";
+ private double _step;
+ private double _currentValue;
+ private boolean _cyclic;
+
+ public RangePropertyValue()
+ {
+ super();
+ _cyclic = true;
+ _currentValue = 0.0;
+ _step = 1.0;
+ }
+
+ public synchronized double getStep()
+ {
+ return _step;
+ }
+
+ public synchronized boolean isCyclic()
+ {
+ return _cyclic;
+ }
+
+ public synchronized void setCyclic(boolean cyclic)
+ {
+ _cyclic = cyclic;
+ }
+
+ public synchronized void setStep(double step)
+ {
+ _step = step;
+ }
+
+ @Override
+ public synchronized double nextDouble()
+ {
+ double result = 0.0;
+ double lower = getLower();
+ double upper = getUpper();
+ if (_currentValue < lower)
+ {
+ _currentValue = lower;
+ }
+ else if (_currentValue > upper)
+ {
+ if (_cyclic)
+ {
+ _currentValue = lower;
+ }
+ else
+ {
+ _currentValue = upper;
+ }
+ }
+ result = _currentValue;
+ _currentValue += _step;
+ return result;
+ }
+
+ @Override
+ public String getDefinition()
+ {
+ return DEF_VALUE;
+ }
+
+ @Override
+ public int hashCode()
+ {
+ final int prime = 31;
+ int result = super.hashCode();
+ long temp;
+ temp = Double.doubleToLongBits(_currentValue);
+ result = prime * result + (int) (temp ^ (temp >>> 32));
+ result = prime * result + (_cyclic ? 1231 : 1237);
+ temp = Double.doubleToLongBits(_step);
+ result = prime * result + (int) (temp ^ (temp >>> 32));
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj)
+ {
+ if (this == obj)
+ {
+ return true;
+ }
+ if (!(obj instanceof RangePropertyValue))
+ {
+ return false;
+ }
+ if (!super.equals(obj))
+ {
+ return false;
+ }
+ RangePropertyValue other = (RangePropertyValue) obj;
+ if (Double.doubleToLongBits(_currentValue) != Double.doubleToLongBits(other._currentValue)
+ || Double.doubleToLongBits(_step) != Double.doubleToLongBits(other._step) || _cyclic != other._cyclic)
+ {
+ return false;
+ }
+
+ return true;
+ }
+}
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/property/SimplePropertyValue.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/property/SimplePropertyValue.java
new file mode 100644
index 0000000000..9141e68656
--- /dev/null
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/property/SimplePropertyValue.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.disttest.client.property;
+
+/**
+ * Simple property value holder for a constant properties.
+ */
+public class SimplePropertyValue implements PropertyValue
+{
+ private Object _value;
+
+ public SimplePropertyValue()
+ {
+ super();
+ }
+
+ public SimplePropertyValue(Object value)
+ {
+ super();
+ this._value = value;
+ }
+
+ @Override
+ public Object getValue()
+ {
+ return _value;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "SimplePropertyValue [value=" + _value + "]";
+ }
+
+ @Override
+ public int hashCode()
+ {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + ((_value == null) ? 0 : _value.hashCode());
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj)
+ {
+ if (this == obj)
+ {
+ return true;
+ }
+ if (obj == null || getClass() != obj.getClass())
+ {
+ return false;
+ }
+ SimplePropertyValue other = (SimplePropertyValue) obj;
+ if (_value == null && other._value != null)
+ {
+ return false;
+ }
+ return _value.equals(other._value);
+ }
+
+}
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/controller/ClientRegistry.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/controller/ClientRegistry.java
new file mode 100644
index 0000000000..b049da1f84
--- /dev/null
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/controller/ClientRegistry.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.disttest.controller;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Set;
+import java.util.concurrent.ConcurrentSkipListSet;
+
+import org.apache.qpid.disttest.DistributedTestException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ClientRegistry
+{
+ private static final Logger LOGGER = LoggerFactory.getLogger(ClientRegistry.class);
+
+ private final Set<String> _registeredClientNames = new ConcurrentSkipListSet<String>();
+
+ public void registerClient(String clientName)
+ {
+ final boolean alreadyContainsClient = !_registeredClientNames.add(clientName);
+ if (alreadyContainsClient)
+ {
+ throw new DistributedTestException("Duplicate client name " + clientName);
+ }
+
+ LOGGER.info("Client registered: " + clientName);
+ }
+
+ public Collection<String> getClients()
+ {
+ return Collections.unmodifiableSet(_registeredClientNames);
+ }
+
+}
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/controller/CommandForClient.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/controller/CommandForClient.java
new file mode 100644
index 0000000000..6c0c253807
--- /dev/null
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/controller/CommandForClient.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.disttest.controller;
+
+import org.apache.qpid.disttest.message.Command;
+
+public class CommandForClient
+{
+ private String _clientName;
+ private Command _command;
+
+ public CommandForClient(String clientName, Command command)
+ {
+ _clientName = clientName;
+ _command = command;
+ }
+
+ public String getClientName()
+ {
+ return _clientName;
+ }
+
+ public Command getCommand()
+ {
+ return _command;
+ }
+
+
+}
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/controller/CommandListener.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/controller/CommandListener.java
new file mode 100644
index 0000000000..e2f40bebe8
--- /dev/null
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/controller/CommandListener.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.disttest.controller;
+
+import org.apache.qpid.disttest.message.Command;
+
+public interface CommandListener
+{
+
+ public abstract void processCommand(Command command);
+
+ public boolean supports(Command command);
+
+}
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/controller/Controller.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/controller/Controller.java
new file mode 100644
index 0000000000..a5e0933704
--- /dev/null
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/controller/Controller.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.disttest.controller;
+
+import java.util.Collection;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.qpid.disttest.DistributedTestException;
+import org.apache.qpid.disttest.controller.config.Config;
+import org.apache.qpid.disttest.controller.config.TestInstance;
+import org.apache.qpid.disttest.jms.ControllerJmsDelegate;
+import org.apache.qpid.disttest.message.Command;
+import org.apache.qpid.disttest.message.CommandType;
+import org.apache.qpid.disttest.message.RegisterClientCommand;
+import org.apache.qpid.disttest.message.Response;
+import org.apache.qpid.disttest.message.StopClientCommand;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class Controller
+{
+ private static final Logger LOGGER = LoggerFactory.getLogger(Controller.class);
+
+ private final long _registrationTimeout;
+ private final long _commandResponseTimeout;
+
+ private final ControllerJmsDelegate _jmsDelegate;
+
+ private volatile CountDownLatch _clientRegistrationLatch;
+ private volatile CountDownLatch _stopClientsResponseLatch = null;
+
+ private Config _config;
+ private TestRunnerFactory _testRunnerFactory;
+ private ClientRegistry _clientRegistry;
+
+ private long _testResultTimeout = TestRunner.WAIT_FOREVER;
+
+ public Controller(final ControllerJmsDelegate jmsDelegate, long registrationTimeout, long commandResponseTimeout)
+ {
+ _jmsDelegate = jmsDelegate;
+ _registrationTimeout = registrationTimeout;
+ _commandResponseTimeout = commandResponseTimeout;
+ _testRunnerFactory = new TestRunnerFactory();
+ _clientRegistry = new ClientRegistry();
+ }
+
+ public void setConfig(Config config)
+ {
+ _config = config;
+ validateConfiguration();
+ int numberOfClients = config.getTotalNumberOfClients();
+ _clientRegistrationLatch = new CountDownLatch(numberOfClients);
+
+ _jmsDelegate.addCommandListener(new RegisterClientCommandListener());
+ _jmsDelegate.addCommandListener(new StopClientResponseListener());
+ _jmsDelegate.start();
+ }
+
+
+ public void awaitClientRegistrations()
+ {
+ LOGGER.info("Awaiting client registration");
+ awaitLatch(_clientRegistrationLatch, _registrationTimeout, "Timed out waiting for registrations. Expecting %d more registrations");
+ }
+
+ private void validateConfiguration()
+ {
+ if (_config == null || _config.getTotalNumberOfClients() == 0)
+ {
+ throw new DistributedTestException("No controller config or no clients specified in test config");
+ }
+ }
+
+ private void awaitLatch(CountDownLatch latch, long timeout, String messageWithOneDecimalPlaceholder)
+ {
+ try
+ {
+ final boolean countedDownOK = latch.await(timeout, TimeUnit.MILLISECONDS);
+ if (!countedDownOK)
+ {
+ final long latchCount = latch.getCount();
+ String formattedMessage = String.format(messageWithOneDecimalPlaceholder, latchCount);
+ throw new DistributedTestException(formattedMessage);
+ }
+ }
+ catch (final InterruptedException e)
+ {
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ public void registerClient(final RegisterClientCommand registrationCommand)
+ {
+ final String clientName = registrationCommand.getClientName();
+
+ _clientRegistry.registerClient(clientName);
+ _jmsDelegate.registerClient(registrationCommand);
+
+ _clientRegistrationLatch.countDown();
+ LOGGER.info("Counted down latch for client: " + clientName + " latch count=" + _clientRegistrationLatch.getCount());
+ }
+
+ void processStopClientResponse(final Response response)
+ {
+ _stopClientsResponseLatch.countDown();
+ if (response.hasError())
+ {
+ LOGGER.error("Client " + response.getRegisteredClientName() + " reported exception in response to command : " +
+ response.getErrorMessage());
+ }
+ }
+
+ public void stopAllRegisteredClients()
+ {
+ Collection<String> registeredClients = _clientRegistry.getClients();
+
+ LOGGER.info("Stopping all clients");
+ _stopClientsResponseLatch = new CountDownLatch(registeredClients.size());
+ Command command = new StopClientCommand();
+ for (final String clientName : registeredClients)
+ {
+ _jmsDelegate.sendCommandToClient(clientName, command);
+ }
+
+ awaitLatch(_stopClientsResponseLatch, _commandResponseTimeout, "Timed out waiting for stop command responses. Expecting %d more responses.");
+
+ LOGGER.info("Stopped all clients");
+ }
+
+
+ public ResultsForAllTests runAllTests()
+ {
+ LOGGER.info("Running all tests");
+
+ ResultsForAllTests resultsForAllTests = new ResultsForAllTests();
+
+ for (TestInstance testInstance : _config.getTests())
+ {
+
+ ParticipatingClients participatingClients = new ParticipatingClients(_clientRegistry, testInstance.getClientNames());
+
+ LOGGER.info("Running test " + testInstance + ". Participating clients: " + participatingClients.getRegisteredNames());
+ TestRunner runner = _testRunnerFactory.createTestRunner(participatingClients,
+ testInstance,
+ _jmsDelegate,
+ _commandResponseTimeout,
+ _testResultTimeout);
+
+ TestResult testResult = runner.run();
+ LOGGER.info("Finished test " + testInstance);
+
+ resultsForAllTests.add(testResult);
+ }
+
+ return resultsForAllTests;
+ }
+
+ private final class StopClientResponseListener implements CommandListener
+ {
+ @Override
+ public boolean supports(Command command)
+ {
+ return command.getType() == CommandType.RESPONSE && ((Response)command).getInReplyToCommandType() == CommandType.STOP_CLIENT;
+ }
+
+ @Override
+ public void processCommand(Command command)
+ {
+ processStopClientResponse((Response)command);
+ }
+ }
+
+ private final class RegisterClientCommandListener implements
+ CommandListener
+ {
+ @Override
+ public boolean supports(Command command)
+ {
+ return command.getType() == CommandType.REGISTER_CLIENT;
+ }
+
+ @Override
+ public void processCommand(Command command)
+ {
+ registerClient((RegisterClientCommand)command);
+ }
+ }
+
+ public void setTestResultTimeout(final long testResultTimeout)
+ {
+ _testResultTimeout = testResultTimeout;
+
+ }
+
+ void setClientRegistry(ClientRegistry clientRegistry)
+ {
+ _clientRegistry = clientRegistry;
+
+ }
+
+ void setTestRunnerFactory(TestRunnerFactory factory)
+ {
+ if (factory == null)
+ {
+ throw new IllegalArgumentException("TestRunnerFactory cannot be null!");
+ }
+ _testRunnerFactory = factory;
+ }
+}
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/controller/ParticipatingClients.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/controller/ParticipatingClients.java
new file mode 100644
index 0000000000..077d628697
--- /dev/null
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/controller/ParticipatingClients.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.disttest.controller;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.TreeSet;
+
+import org.apache.commons.collections.BidiMap;
+import org.apache.commons.collections.bidimap.DualHashBidiMap;
+import org.apache.commons.lang.builder.ToStringBuilder;
+import org.apache.commons.lang.builder.ToStringStyle;
+
+public class ParticipatingClients
+{
+ private final BidiMap _configuredToRegisteredNameMap;
+
+ public ParticipatingClients(ClientRegistry clientRegistry, List<String> configuredClientNamesForTest)
+ {
+ _configuredToRegisteredNameMap = mapConfiguredToRegisteredClientNames(configuredClientNamesForTest, clientRegistry);
+ }
+
+ public String getRegisteredNameFromConfiguredName(String clientConfiguredName)
+ {
+ String registeredClientName = (String) _configuredToRegisteredNameMap.get(clientConfiguredName);
+ if (registeredClientName == null)
+ {
+ throw new IllegalArgumentException("Unrecognised client configured name " + clientConfiguredName
+ + " Mapping is " + _configuredToRegisteredNameMap);
+ }
+ return registeredClientName;
+ }
+
+ public String getConfiguredNameFromRegisteredName(String registeredClientName)
+ {
+ String clientConfiguredName = (String) _configuredToRegisteredNameMap.getKey(registeredClientName);
+ if (clientConfiguredName == null)
+ {
+ throw new IllegalArgumentException("Unrecognised client registered name " + registeredClientName
+ + " Mapping is " + _configuredToRegisteredNameMap);
+ }
+
+ return clientConfiguredName;
+ }
+
+ private BidiMap mapConfiguredToRegisteredClientNames(List<String> configuredClientNamesForTest, ClientRegistry clientRegistry)
+ {
+ BidiMap configuredToRegisteredNameMap = new DualHashBidiMap();
+
+ TreeSet<String> registeredClients = new TreeSet<String>(clientRegistry.getClients());
+ for (String configuredClientName : configuredClientNamesForTest)
+ {
+ String allocatedClientName = registeredClients.pollFirst();
+ if (allocatedClientName == null)
+ {
+ throw new IllegalArgumentException("Too few clients in registry " + clientRegistry + " configured clients " + configuredClientNamesForTest);
+ }
+ configuredToRegisteredNameMap.put(configuredClientName, allocatedClientName);
+ }
+
+ return configuredToRegisteredNameMap;
+ }
+
+ @SuppressWarnings("unchecked")
+ public Collection<String> getRegisteredNames()
+ {
+ return _configuredToRegisteredNameMap.values();
+ }
+
+ @Override
+ public String toString()
+ {
+ return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE)
+ .append("configuredToRegisteredNameMap", _configuredToRegisteredNameMap).toString();
+ }
+
+
+}
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/controller/ResultsForAllTests.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/controller/ResultsForAllTests.java
new file mode 100644
index 0000000000..6c5ff3450c
--- /dev/null
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/controller/ResultsForAllTests.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.disttest.controller;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.qpid.disttest.results.aggregation.ITestResult;
+
+public class ResultsForAllTests
+{
+ private List<ITestResult> _results = new ArrayList<ITestResult>();
+ private boolean _hasErrors;
+
+ public List<ITestResult> getTestResults()
+ {
+ return _results;
+ }
+
+ public void add(ITestResult testResult)
+ {
+ _results.add(testResult);
+ if(testResult.hasErrors())
+ {
+ _hasErrors = true;
+ }
+ }
+
+ public boolean hasErrors()
+ {
+ return _hasErrors;
+ }
+}
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/controller/TestResult.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/controller/TestResult.java
new file mode 100644
index 0000000000..756c641532
--- /dev/null
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/controller/TestResult.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.disttest.controller;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import org.apache.qpid.disttest.message.ParticipantResult;
+import org.apache.qpid.disttest.results.aggregation.ITestResult;
+
+public class TestResult implements ITestResult
+{
+ private final SortedSet<ParticipantResult> _participantResults = Collections.synchronizedSortedSet(
+ new TreeSet<ParticipantResult>(ParticipantResult.PARTICIPANT_NAME_COMPARATOR));
+
+ private boolean _hasErrors;
+ private String _name;
+
+ public TestResult(String name)
+ {
+ _name = name;
+ }
+
+ @Override
+ public List<ParticipantResult> getParticipantResults()
+ {
+ List<ParticipantResult> list = new ArrayList<ParticipantResult>(_participantResults);
+ return Collections.unmodifiableList(list);
+ }
+
+ public void addParticipantResult(ParticipantResult participantResult)
+ {
+ _participantResults.add(participantResult);
+ if(participantResult.hasError())
+ {
+ _hasErrors = true;
+ }
+ }
+
+ @Override
+ public boolean hasErrors()
+ {
+ return _hasErrors;
+ }
+
+ @Override
+ public String getName()
+ {
+ return _name;
+ }
+}
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/controller/TestRunner.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/controller/TestRunner.java
new file mode 100644
index 0000000000..f81b691ea6
--- /dev/null
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/controller/TestRunner.java
@@ -0,0 +1,307 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.disttest.controller;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.qpid.disttest.DistributedTestException;
+import org.apache.qpid.disttest.controller.config.QueueConfig;
+import org.apache.qpid.disttest.controller.config.TestInstance;
+import org.apache.qpid.disttest.jms.ControllerJmsDelegate;
+import org.apache.qpid.disttest.message.Command;
+import org.apache.qpid.disttest.message.CommandType;
+import org.apache.qpid.disttest.message.ParticipantResult;
+import org.apache.qpid.disttest.message.Response;
+import org.apache.qpid.disttest.message.StartTestCommand;
+import org.apache.qpid.disttest.message.TearDownTestCommand;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TestRunner
+{
+ private static final Logger LOGGER = LoggerFactory.getLogger(TestRunner.class);
+
+ private static final long PARTICIPANT_RESULTS_LOG_INTERVAL = 60000;
+ public static final long WAIT_FOREVER = -1;
+
+ private final long _commandResponseTimeout;
+
+ private final Set<CommandType> _setOfResponsesToExpect = Collections.synchronizedSet(new HashSet<CommandType>());
+
+ private final ParticipatingClients _participatingClients;
+
+ private final TestInstance _testInstance;
+ private ControllerJmsDelegate _jmsDelegate;
+
+ private volatile CountDownLatch _commandResponseLatch = null;
+ private final CountDownLatch _testResultsLatch;
+ private final TestResult _testResult;
+
+ /** Length of time to await test results or {@value #WAIT_FOREVER} */
+ private final long _testResultTimeout;
+
+
+ public TestRunner(ParticipatingClients participatingClients, TestInstance testInstance, ControllerJmsDelegate jmsDelegate, long commandResponseTimeout, long testResultTimeout)
+ {
+ _participatingClients = participatingClients;
+ _testInstance = testInstance;
+ _jmsDelegate = jmsDelegate;
+ _commandResponseTimeout = commandResponseTimeout;
+ _testResultsLatch = new CountDownLatch(testInstance.getTotalNumberOfParticipants());
+ _testResultTimeout = testResultTimeout;
+ _testResult = new TestResult(testInstance.getName());
+ }
+
+ public TestResult run()
+ {
+ final ParticipantResultListener participantResultListener = new ParticipantResultListener();
+ TestCommandResponseListener testCommandResponseListener = new TestCommandResponseListener();
+
+ try
+ {
+ _jmsDelegate.addCommandListener(testCommandResponseListener);
+ _jmsDelegate.addCommandListener(participantResultListener);
+
+ runParts();
+
+ return _testResult;
+ }
+ finally
+ {
+ _jmsDelegate.removeCommandListener(participantResultListener);
+ _jmsDelegate.removeCommandListener(testCommandResponseListener);
+ }
+ }
+
+ private void runParts()
+ {
+ boolean queuesCreated = false;
+ try
+ {
+ createQueues();
+ queuesCreated = true;
+ sendTestSetupCommands();
+ awaitCommandResponses();
+ sendCommandToParticipatingClients(new StartTestCommand());
+ awaitCommandResponses();
+
+ awaitTestResults();
+
+ sendCommandToParticipatingClients(new TearDownTestCommand());
+ awaitCommandResponses();
+ }
+ finally
+ {
+ if (queuesCreated)
+ {
+ deleteQueues();
+ }
+ }
+ }
+
+ void createQueues()
+ {
+ List<QueueConfig> queues = _testInstance.getQueues();
+ if (!queues.isEmpty())
+ {
+ _jmsDelegate.createQueues(queues);
+ }
+ }
+
+ void sendTestSetupCommands()
+ {
+ List<CommandForClient> commandsForAllClients = _testInstance.createCommands();
+ _commandResponseLatch = new CountDownLatch(commandsForAllClients.size());
+ for (CommandForClient commandForClient : commandsForAllClients)
+ {
+ String configuredClientName = commandForClient.getClientName();
+ String registeredClientName = _participatingClients.getRegisteredNameFromConfiguredName(configuredClientName);
+
+ Command command = commandForClient.getCommand();
+ sendCommandInternal(registeredClientName, command);
+ }
+ }
+
+ void awaitCommandResponses()
+ {
+ awaitLatch(_commandResponseLatch, _commandResponseTimeout, "Timed out waiting for command responses. Expecting %d more responses.");
+ }
+
+
+ void processCommandResponse(final Response response)
+ {
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("Received response for command " + response);
+ }
+
+ _commandResponseLatch.countDown();
+ checkForResponseError(response);
+ }
+
+
+ void awaitTestResults()
+ {
+ long timeout = _testResultTimeout;
+ DistributedTestException lastException = null;
+
+ boolean waitForever = _testResultTimeout == WAIT_FOREVER;
+ final long interval = waitForever ? PARTICIPANT_RESULTS_LOG_INTERVAL : Math.min(PARTICIPANT_RESULTS_LOG_INTERVAL, _testResultTimeout);
+
+ while(_testResultsLatch.getCount() > 0 && (waitForever || timeout > 0))
+ {
+ try
+ {
+ awaitLatch(_testResultsLatch, interval, "Waiting for participant results... Expecting %d more responses.");
+ }
+ catch (DistributedTestException e)
+ {
+ lastException = e;
+ LOGGER.info(e.getMessage());
+ }
+
+ if (!waitForever)
+ {
+ timeout =- interval;
+ }
+ }
+
+ if (_testResultsLatch.getCount() > 0)
+ {
+ throw lastException;
+ }
+ }
+
+ void deleteQueues()
+ {
+ List<QueueConfig> queues = _testInstance.getQueues();
+ if (!queues.isEmpty())
+ {
+ _jmsDelegate.deleteQueues(queues);
+ }
+ }
+
+ void sendCommandToParticipatingClients(final Command command)
+ {
+ Collection<String> participatingRegisteredClients = _participatingClients.getRegisteredNames();
+ _commandResponseLatch = new CountDownLatch(participatingRegisteredClients.size());
+ for (final String clientName : participatingRegisteredClients)
+ {
+ sendCommandInternal(clientName, command);
+ }
+ }
+
+ public void processParticipantResult(ParticipantResult result)
+ {
+ setOriginalTestDetailsOn(result);
+
+ _testResult.addParticipantResult(result);
+ LOGGER.info("Received result " + result);
+
+ _testResultsLatch.countDown();
+ checkForResponseError(result);
+ }
+
+ private void setOriginalTestDetailsOn(ParticipantResult result)
+ {
+ // Client knows neither the configured client name nor test name
+ String registeredClientName = result.getRegisteredClientName();
+ String configuredClient = _participatingClients.getConfiguredNameFromRegisteredName(registeredClientName);
+
+ result.setConfiguredClientName(configuredClient);
+ result.setTestName(_testInstance.getName());
+ result.setIterationNumber(_testInstance.getIterationNumber());
+ }
+
+ private void sendCommandInternal(String registeredClientName, Command command)
+ {
+ _setOfResponsesToExpect.add(command.getType());
+ _jmsDelegate.sendCommandToClient(registeredClientName, command);
+ }
+
+ private void awaitLatch(CountDownLatch latch, long timeout, String messageWithOneDecimalPlaceholder)
+ {
+ try
+ {
+ final boolean countedDownOK = latch.await(timeout, TimeUnit.MILLISECONDS);
+ if (!countedDownOK)
+ {
+ final long latchCount = latch.getCount();
+ String formattedMessage = String.format(messageWithOneDecimalPlaceholder, latchCount);
+ throw new DistributedTestException(formattedMessage);
+ }
+ }
+ catch (final InterruptedException e)
+ {
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ private void checkForResponseError(final Response response)
+ {
+ if (response.hasError())
+ {
+ LOGGER.error("Client " + response.getRegisteredClientName() + " reported error " + response);
+ }
+ }
+
+ final class ParticipantResultListener implements CommandListener
+ {
+ @Override
+ public boolean supports(Command command)
+ {
+ return command instanceof ParticipantResult;
+ }
+
+ @Override
+ public void processCommand(Command command)
+ {
+ processParticipantResult((ParticipantResult) command);
+
+ }
+ }
+
+ final class TestCommandResponseListener implements CommandListener
+ {
+ @Override
+ public void processCommand(Command command)
+ {
+ processCommandResponse((Response)command);
+ }
+
+ @Override
+ public boolean supports(Command command)
+ {
+ CommandType type = command.getType();
+ if (type == CommandType.RESPONSE)
+ {
+ Response response = (Response)command;
+ return _setOfResponsesToExpect.contains(response.getInReplyToCommandType());
+ }
+ return false;
+ }
+ }
+
+}
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/controller/TestRunnerFactory.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/controller/TestRunnerFactory.java
new file mode 100644
index 0000000000..bf0e5afb9c
--- /dev/null
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/controller/TestRunnerFactory.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.disttest.controller;
+
+import org.apache.qpid.disttest.controller.config.TestInstance;
+import org.apache.qpid.disttest.jms.ControllerJmsDelegate;
+
+public class TestRunnerFactory
+{
+ public TestRunner createTestRunner(ParticipatingClients participatingClients, TestInstance testInstance, ControllerJmsDelegate jmsDelegate, long commandResponseTimeout, long testResultTimeout)
+ {
+ return new TestRunner(participatingClients, testInstance, jmsDelegate, commandResponseTimeout, testResultTimeout);
+ }
+}
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/controller/config/ClientConfig.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/controller/config/ClientConfig.java
new file mode 100644
index 0000000000..4353a85cd3
--- /dev/null
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/controller/config/ClientConfig.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.disttest.controller.config;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.qpid.disttest.controller.CommandForClient;
+import org.apache.qpid.disttest.message.Command;
+
+public class ClientConfig
+{
+ /*
+ * TODO add this field when repeating groups of clients need to be used. Talk to Phil and Keith!
+ * private int _instances;
+ */
+
+ private List<ConnectionConfig> _connections;
+ private List<MessageProviderConfig> _messageProviders;
+ private String _name;
+
+ public ClientConfig()
+ {
+ _name = null;
+ _connections = Collections.emptyList();
+ _messageProviders = Collections.emptyList();
+ }
+
+ public ClientConfig(String name, ConnectionConfig... connections)
+ {
+ this(name, Arrays.asList(connections), null);
+ }
+
+ public ClientConfig(String name, List<ConnectionConfig> connections, List<MessageProviderConfig> messageProviders)
+ {
+ _name = name;
+ _connections = connections;
+ if (messageProviders == null)
+ {
+ _messageProviders = Collections.emptyList();
+ }
+ else
+ {
+ _messageProviders = messageProviders;
+ }
+ }
+
+ public String getName()
+ {
+ return _name;
+ }
+
+ public List<ConnectionConfig> getConnections()
+ {
+ return Collections.unmodifiableList(_connections);
+ }
+
+ public List<CommandForClient> createCommands()
+ {
+ List<CommandForClient> commandsForClient = new ArrayList<CommandForClient>();
+
+ for (MessageProviderConfig messageProvider : _messageProviders)
+ {
+ Command command = messageProvider.createCommand();
+ commandsForClient.add(new CommandForClient(_name, command));
+ }
+ for (ConnectionConfig connection : _connections)
+ {
+ List<Command> commands = connection.createCommands();
+ for (Command command : commands)
+ {
+ commandsForClient.add(new CommandForClient(_name, command));
+ }
+ }
+ return commandsForClient;
+ }
+
+ public int getTotalNumberOfParticipants()
+ {
+ int numOfParticipants = 0;
+ for (ConnectionConfig connection : _connections)
+ {
+ numOfParticipants = numOfParticipants + connection.getTotalNumberOfParticipants();
+ }
+ return numOfParticipants;
+ }
+
+ public List<MessageProviderConfig> getMessageProviders()
+ {
+ return Collections.unmodifiableList(_messageProviders);
+ }
+
+
+}
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/controller/config/Config.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/controller/config/Config.java
new file mode 100644
index 0000000000..fe56137276
--- /dev/null
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/controller/config/Config.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.disttest.controller.config;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+public class Config
+{
+ private List<TestConfig> _tests;
+
+ public Config()
+ {
+ super();
+ _tests = Collections.emptyList();
+ }
+
+ public Config(TestConfig... tests)
+ {
+ _tests = Arrays.asList(tests);
+ }
+
+ public List<TestInstance> getTests()
+ {
+ List<TestInstance> testInstances = new ArrayList<TestInstance>();
+ for (TestConfig testConfig : _tests)
+ {
+ int iterationNumber = 0;
+
+ List<IterationValue> iterationValues = testConfig.getIterationValues();
+ if(iterationValues.isEmpty())
+ {
+ testInstances.add(new TestInstance(testConfig));
+ }
+ else
+ {
+ for (IterationValue iterationValue : iterationValues)
+ {
+ testInstances.add(new TestInstance(testConfig, iterationNumber, iterationValue));
+ iterationNumber++;
+ }
+ }
+ }
+
+ return Collections.unmodifiableList(testInstances);
+ }
+
+ List<TestConfig> getTestConfigs()
+ {
+ return Collections.unmodifiableList(_tests);
+ }
+
+ public int getTotalNumberOfClients()
+ {
+ int numberOfClients = 0;
+ for (TestConfig testConfig : _tests)
+ {
+ numberOfClients = Math.max(testConfig.getTotalNumberOfClients(), numberOfClients);
+ }
+ return numberOfClients;
+ }
+
+}
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/controller/config/ConfigReader.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/controller/config/ConfigReader.java
new file mode 100644
index 0000000000..6288b42eac
--- /dev/null
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/controller/config/ConfigReader.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.disttest.controller.config;
+
+import java.io.FileNotFoundException;
+import java.io.FileReader;
+import java.io.Reader;
+
+import org.apache.qpid.disttest.client.property.PropertyValue;
+import org.apache.qpid.disttest.json.PropertyValueAdapter;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+
+public class ConfigReader
+{
+
+ public Config getConfigFromFile(String fileName) throws FileNotFoundException
+ {
+ FileReader reader = new FileReader(fileName);
+
+ Config config = readConfig(reader);
+ return config;
+ }
+
+ public Config readConfig(Reader reader)
+ {
+ Gson gson = new GsonBuilder()
+ .registerTypeAdapter(PropertyValue.class, new PropertyValueAdapter())
+ .create();
+ Config config = gson.fromJson(reader, Config.class);
+ return config;
+ }
+
+}
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/controller/config/ConnectionConfig.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/controller/config/ConnectionConfig.java
new file mode 100644
index 0000000000..e2cc31e21e
--- /dev/null
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/controller/config/ConnectionConfig.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.disttest.controller.config;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.qpid.disttest.message.Command;
+import org.apache.qpid.disttest.message.CreateConnectionCommand;
+
+public class ConnectionConfig
+{
+ private String _name;
+ private List<SessionConfig> _sessions;
+ private String _factory;
+
+ // For Gson
+ public ConnectionConfig()
+ {
+ super();
+ _sessions = Collections.emptyList();
+ }
+
+ public ConnectionConfig(String name, String factory, SessionConfig... sessions)
+ {
+ super();
+ _name = name;
+ _factory = factory;
+ _sessions = Arrays.asList(sessions);
+
+ }
+
+ public List<SessionConfig> getSessions()
+ {
+ return Collections.unmodifiableList(_sessions);
+ }
+
+ public String getName()
+ {
+ return _name;
+ }
+
+ public List<Command> createCommands()
+ {
+ List<Command> commands = new ArrayList<Command>();
+ commands.add(createCommand());
+ for (SessionConfig sessionConfig : _sessions)
+ {
+ commands.addAll(sessionConfig.createCommands(_name));
+ }
+ return commands;
+ }
+
+ private CreateConnectionCommand createCommand()
+ {
+ CreateConnectionCommand command = new CreateConnectionCommand();
+ command.setConnectionName(_name);
+ command.setConnectionFactoryName(_factory);
+ return command;
+ }
+
+ public int getTotalNumberOfParticipants()
+ {
+ int numOfParticipants = 0;
+
+ for (SessionConfig sessionConfig : _sessions)
+ {
+ numOfParticipants = numOfParticipants + sessionConfig.getTotalNumberOfParticipants();
+ }
+ return numOfParticipants;
+ }
+}
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/controller/config/ConsumerConfig.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/controller/config/ConsumerConfig.java
new file mode 100644
index 0000000000..ed47e02667
--- /dev/null
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/controller/config/ConsumerConfig.java
@@ -0,0 +1,65 @@
+package org.apache.qpid.disttest.controller.config;
+
+import org.apache.qpid.disttest.message.CreateConsumerCommand;
+
+public class ConsumerConfig extends ParticipantConfig
+{
+ private boolean _isTopic;
+ private boolean _isDurableSubscription;
+ private boolean _isBrowsingSubscription;
+ private String _selector;
+ private boolean _noLocal;
+ private boolean _synchronous;
+
+ // For Gson
+ public ConsumerConfig()
+ {
+ _isTopic = false;
+ _isDurableSubscription = false;
+ _isBrowsingSubscription = false;
+ _selector = null;
+ _noLocal = false;
+ _synchronous = true;
+ }
+
+ public ConsumerConfig(
+ String consumerName,
+ String destinationName,
+ long numberOfMessages,
+ int batchSize,
+ long maximumDuration,
+ boolean isTopic,
+ boolean isDurableSubscription,
+ boolean isBrowsingSubscription,
+ String selector,
+ boolean noLocal,
+ boolean synchronous)
+ {
+ super(consumerName, destinationName, numberOfMessages, batchSize, maximumDuration);
+
+ _isTopic = isTopic;
+ _isDurableSubscription = isDurableSubscription;
+ _isBrowsingSubscription = isBrowsingSubscription;
+ _selector = selector;
+ _noLocal = noLocal;
+ _synchronous = synchronous;
+ }
+
+ public CreateConsumerCommand createCommand(String sessionName)
+ {
+ CreateConsumerCommand createConsumerCommand = new CreateConsumerCommand();
+
+ setParticipantProperties(createConsumerCommand);
+
+ createConsumerCommand.setSessionName(sessionName);
+ createConsumerCommand.setTopic(_isTopic);
+ createConsumerCommand.setDurableSubscription(_isDurableSubscription);
+ createConsumerCommand.setBrowsingSubscription(_isBrowsingSubscription);
+ createConsumerCommand.setSelector(_selector);
+ createConsumerCommand.setNoLocal(_noLocal);
+ createConsumerCommand.setSynchronous(_synchronous);
+
+ return createConsumerCommand;
+ }
+
+}
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/controller/config/IterationValue.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/controller/config/IterationValue.java
new file mode 100644
index 0000000000..ef953a5d07
--- /dev/null
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/controller/config/IterationValue.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.disttest.controller.config;
+
+import java.lang.reflect.InvocationTargetException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.beanutils.BeanUtilsBean;
+import org.apache.commons.lang.builder.ToStringBuilder;
+import org.apache.commons.lang.builder.ToStringStyle;
+import org.apache.qpid.disttest.message.Command;
+
+public class IterationValue
+{
+ private final Map<String, String> _iterationPropertyValuesWithUnderscores;
+
+ public IterationValue(Map<String, String> iterationMap)
+ {
+ _iterationPropertyValuesWithUnderscores = iterationMap;
+ }
+
+ public IterationValue()
+ {
+ _iterationPropertyValuesWithUnderscores = Collections.emptyMap();
+ }
+
+ public Map<String, String> getIterationPropertyValuesWithUnderscores()
+ {
+ return _iterationPropertyValuesWithUnderscores;
+ }
+
+ public void applyToCommand(Command command)
+ {
+ try
+ {
+ Map<String, String> withoutUnderscoresToMatchCommandPropertyNames = getIterationPropertyValuesWithoutUnderscores();
+ BeanUtilsBean.getInstance().copyProperties(command, withoutUnderscoresToMatchCommandPropertyNames);
+ }
+ catch (IllegalAccessException e)
+ {
+ throw new RuntimeException("Couldn't copy properties from iteration " + this + " to " + command, e);
+ }
+ catch (InvocationTargetException e)
+ {
+ throw new RuntimeException("Couldn't copy properties from iteration " + this + " to " + command, e);
+ }
+ }
+
+ private Map<String, String> getIterationPropertyValuesWithoutUnderscores()
+ {
+ Map<String, String> iterationPropertyValues = new HashMap<String, String>();
+ for (String propertyNameWithUnderscore : _iterationPropertyValuesWithUnderscores.keySet())
+ {
+ String propertyName = propertyNameWithUnderscore.replaceFirst("_", "");
+ String propertyValue = _iterationPropertyValuesWithUnderscores.get(propertyNameWithUnderscore);
+
+ iterationPropertyValues.put(propertyName, propertyValue);
+ }
+ return iterationPropertyValues;
+ }
+
+ @Override
+ public String toString()
+ {
+ return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE).append("iterationMap", _iterationPropertyValuesWithUnderscores).toString();
+ }
+
+}
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/controller/config/MessageProviderConfig.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/controller/config/MessageProviderConfig.java
new file mode 100644
index 0000000000..318ec7f045
--- /dev/null
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/controller/config/MessageProviderConfig.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.disttest.controller.config;
+
+import java.util.Map;
+
+import org.apache.qpid.disttest.client.property.PropertyValue;
+import org.apache.qpid.disttest.message.CreateMessageProviderCommand;
+
+public class MessageProviderConfig
+{
+ private String _name;
+ private Map<String, PropertyValue> _messageProperties;
+
+ public MessageProviderConfig()
+ {
+ super();
+ }
+
+ public MessageProviderConfig(String name, Map<String, PropertyValue> messageProperties)
+ {
+ super();
+ _name = name;
+ _messageProperties = messageProperties;
+ }
+
+ public String getName()
+ {
+ return _name;
+ }
+
+ public Map<String, PropertyValue> getMessageProperties()
+ {
+ return _messageProperties;
+ }
+
+ public CreateMessageProviderCommand createCommand()
+ {
+ CreateMessageProviderCommand command = new CreateMessageProviderCommand();
+ command.setProviderName(_name);
+ command.setMessageProperties(_messageProperties);
+ return command;
+ }
+}
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/controller/config/ParticipantConfig.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/controller/config/ParticipantConfig.java
new file mode 100644
index 0000000000..31037a3038
--- /dev/null
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/controller/config/ParticipantConfig.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.disttest.controller.config;
+
+import org.apache.qpid.disttest.message.CreateParticpantCommand;
+
+public abstract class ParticipantConfig
+{
+ private String _destinationName;
+ private long _numberOfMessages;
+ private String _name;
+ private int _batchSize;
+ private long _maximumDuration;
+
+ // For GSON
+ public ParticipantConfig()
+ {
+ _name = null;
+ _destinationName = null;
+ _numberOfMessages = 0;
+ _batchSize = 0;
+ _maximumDuration = 0;
+ }
+
+ public ParticipantConfig(
+ String name,
+ String destinationName,
+ long numberOfMessages,
+ int batchSize,
+ long maximumDuration)
+ {
+ _name = name;
+ _destinationName = destinationName;
+ _numberOfMessages = numberOfMessages;
+ _batchSize = batchSize;
+ _maximumDuration = maximumDuration;
+ }
+
+ protected void setParticipantProperties(CreateParticpantCommand createParticipantCommand)
+ {
+ createParticipantCommand.setParticipantName(_name);
+ createParticipantCommand.setDestinationName(_destinationName);
+ createParticipantCommand.setNumberOfMessages(_numberOfMessages);
+ createParticipantCommand.setBatchSize(_batchSize);
+ createParticipantCommand.setMaximumDuration(_maximumDuration);
+ }
+
+} \ No newline at end of file
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/controller/config/ProducerConfig.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/controller/config/ProducerConfig.java
new file mode 100644
index 0000000000..7806528a8c
--- /dev/null
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/controller/config/ProducerConfig.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.disttest.controller.config;
+
+import javax.jms.Message;
+
+import org.apache.qpid.disttest.message.CreateProducerCommand;
+
+public class ProducerConfig extends ParticipantConfig
+{
+ private int _deliveryMode;
+ private int _messageSize;
+ private int _priority;
+ private long _timeToLive;
+ private long _interval;
+ private long _startDelay;
+ private String _messageProviderName;
+
+ // For Gson
+ public ProducerConfig()
+ {
+ _deliveryMode = Message.DEFAULT_DELIVERY_MODE;
+ _messageSize = 0;
+ _priority = Message.DEFAULT_PRIORITY;
+ _timeToLive = Message.DEFAULT_TIME_TO_LIVE;
+ _interval = 0;
+ _startDelay = 0;
+ _messageProviderName = null;
+ }
+
+ public ProducerConfig(
+ String producerName,
+ String destinationName,
+ long numberOfMessages,
+ int batchSize,
+ long maximumDuration,
+ int deliveryMode,
+ int messageSize,
+ int priority,
+ long timeToLive,
+ long interval,
+ long startDelay,
+ String messageProviderName)
+ {
+ super(producerName, destinationName, numberOfMessages, batchSize, maximumDuration);
+
+ _deliveryMode = deliveryMode;
+ _messageSize = messageSize;
+ _priority = priority;
+ _timeToLive = timeToLive;
+ _interval = interval;
+ _startDelay = startDelay;
+ _messageProviderName = messageProviderName;
+ }
+
+ public CreateProducerCommand createCommand(String sessionName)
+ {
+ CreateProducerCommand command = new CreateProducerCommand();
+
+ setParticipantProperties(command);
+
+ command.setSessionName(sessionName);
+ command.setDeliveryMode(_deliveryMode);
+ command.setMessageSize(_messageSize);
+ command.setPriority(_priority);
+ command.setTimeToLive(_timeToLive);
+ command.setInterval(_interval);
+ command.setStartDelay(_startDelay);
+ command.setMessageProviderName(_messageProviderName);
+
+ return command;
+ }
+}
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/controller/config/QueueConfig.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/controller/config/QueueConfig.java
new file mode 100644
index 0000000000..cffc2b7c50
--- /dev/null
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/controller/config/QueueConfig.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.disttest.controller.config;
+
+import java.util.Collections;
+import java.util.Map;
+
+import org.apache.commons.lang.builder.ReflectionToStringBuilder;
+import org.apache.commons.lang.builder.ToStringStyle;
+
+public class QueueConfig
+{
+ private String _name;
+ private boolean _durable;
+ private Map<String, Object> _attributes;
+
+ public QueueConfig()
+ {
+ super();
+ _attributes = Collections.emptyMap();
+ }
+
+ public QueueConfig(String name, boolean durable, Map<String, Object> attributes)
+ {
+ super();
+ this._name = name;
+ this._durable = durable;
+ this._attributes = attributes;
+ }
+
+ public String getName()
+ {
+ return _name;
+ }
+
+ // TODO x-qpid-capacity and x-qpid-flow-resume-capacity need to be typed as numeric but we currrently
+ // pass these as a string.
+ public Map<String, Object> getAttributes()
+ {
+ return _attributes;
+ }
+
+ public boolean isDurable()
+ {
+ return _durable;
+ }
+
+ @Override
+ public String toString()
+ {
+ return ReflectionToStringBuilder.toString(this, ToStringStyle.SHORT_PREFIX_STYLE);
+ }
+}
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/controller/config/SessionConfig.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/controller/config/SessionConfig.java
new file mode 100644
index 0000000000..12372e5391
--- /dev/null
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/controller/config/SessionConfig.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.disttest.controller.config;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import javax.jms.Session;
+
+import org.apache.qpid.disttest.message.Command;
+import org.apache.qpid.disttest.message.CreateSessionCommand;
+
+public class SessionConfig
+{
+ private static final List<ProducerConfig> EMPTY_PRODUCER_LIST = Collections.emptyList();
+ private static final List<ConsumerConfig> EMPTY_CONSUMER_LIST = Collections.emptyList();
+
+ private int _acknowledgeMode;
+ private String _sessionName;
+ private List<ProducerConfig> _producers;
+ private List<ConsumerConfig> _consumers;
+
+ // For Gson
+ public SessionConfig()
+ {
+ this(null, Session.SESSION_TRANSACTED, EMPTY_CONSUMER_LIST, EMPTY_PRODUCER_LIST);
+ }
+
+ public SessionConfig(String sessionName, int acknowledgeMode, ProducerConfig...producers)
+ {
+ this(sessionName, acknowledgeMode, EMPTY_CONSUMER_LIST, Arrays.asList(producers));
+ }
+
+ public SessionConfig(String sessionName, int acknowledgeMode, ConsumerConfig... consumers)
+ {
+ this(sessionName, acknowledgeMode, Arrays.asList(consumers), EMPTY_PRODUCER_LIST);
+ }
+
+ public SessionConfig(String sessionName, int acknowledgeMode, List<ConsumerConfig> consumers, List<ProducerConfig> producers)
+ {
+ _sessionName = sessionName;
+ _acknowledgeMode = acknowledgeMode;
+ _consumers = consumers;
+ _producers = producers;
+ }
+
+ public int getAcknowledgeMode()
+ {
+ return _acknowledgeMode;
+ }
+
+ public String getSessionName()
+ {
+ return _sessionName;
+ }
+
+ public List<ProducerConfig> getProducers()
+ {
+ return Collections.unmodifiableList(_producers);
+ }
+
+ public List<ConsumerConfig> getConsumers()
+ {
+ return Collections.unmodifiableList(_consumers);
+ }
+
+ public List<Command> createCommands(String connectionName)
+ {
+ List<Command> commands = new ArrayList<Command>();
+ commands.add(createCommand(connectionName));
+ for (ProducerConfig producer : _producers)
+ {
+ commands.add(producer.createCommand(_sessionName));
+ }
+ for (ConsumerConfig consumer : _consumers)
+ {
+ commands.add(consumer.createCommand(_sessionName));
+ }
+ return commands;
+ }
+
+ private CreateSessionCommand createCommand(String connectionName)
+ {
+ CreateSessionCommand command = new CreateSessionCommand();
+ command.setAcknowledgeMode(_acknowledgeMode);
+ command.setConnectionName(connectionName);
+ command.setSessionName(_sessionName);
+ return command;
+ }
+
+ public int getTotalNumberOfParticipants()
+ {
+ return _producers.size() + _consumers.size();
+ }
+}
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/controller/config/TestConfig.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/controller/config/TestConfig.java
new file mode 100644
index 0000000000..2bb5f1b289
--- /dev/null
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/controller/config/TestConfig.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.disttest.controller.config;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.qpid.disttest.controller.CommandForClient;
+
+public class TestConfig
+{
+ private final String _name;
+
+ private final List<ClientConfig> _clients;
+
+ private final List<QueueConfig> _queues;
+
+ private final List<Map<String, String>> _iterations;
+
+ public TestConfig()
+ {
+ _clients = Collections.emptyList();
+ _queues = Collections.emptyList();
+ _name = null;
+ _iterations = Collections.emptyList();
+ }
+
+ public TestConfig(String name, ClientConfig[] clients, QueueConfig[] queues)
+ {
+ _clients = Arrays.asList(clients);
+ _queues = Arrays.asList(queues);
+ _name = name;
+ _iterations = Collections.emptyList();
+ }
+
+ public List<String> getClientNames()
+ {
+ List<String> clientNames = new ArrayList<String>();
+ for (ClientConfig clientConfig : _clients)
+ {
+ clientNames.add(clientConfig.getName());
+ }
+ return clientNames;
+ }
+
+ public int getTotalNumberOfClients()
+ {
+ return _clients.size();
+ }
+
+ public int getTotalNumberOfParticipants()
+ {
+ int numOfParticipants = 0;
+ for (ClientConfig client : _clients)
+ {
+ numOfParticipants = numOfParticipants + client.getTotalNumberOfParticipants();
+ }
+ return numOfParticipants;
+ }
+
+ public List<CommandForClient> createCommands()
+ {
+ List<CommandForClient> commandsForClients = new ArrayList<CommandForClient>();
+ for (ClientConfig client : _clients)
+ {
+ commandsForClients.addAll(client.createCommands());
+ }
+
+ return Collections.unmodifiableList(commandsForClients);
+ }
+
+ public List<QueueConfig> getQueues()
+ {
+ return Collections.unmodifiableList(_queues);
+ }
+
+ public String getName()
+ {
+ return _name;
+ }
+
+ public List<IterationValue> getIterationValues()
+ {
+ List<IterationValue> iterationValues = new ArrayList<IterationValue>();
+ for (Map<String, String> iterationMap : _iterations)
+ {
+ iterationValues.add(new IterationValue(iterationMap));
+ }
+
+ return iterationValues;
+ }
+
+ public List<ClientConfig> getClients()
+ {
+ return Collections.unmodifiableList(_clients);
+ }
+}
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/controller/config/TestInstance.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/controller/config/TestInstance.java
new file mode 100644
index 0000000000..9f555ef4da
--- /dev/null
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/controller/config/TestInstance.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.disttest.controller.config;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.lang.builder.ToStringBuilder;
+import org.apache.commons.lang.builder.ToStringStyle;
+import org.apache.qpid.disttest.controller.CommandForClient;
+import org.apache.qpid.disttest.message.Command;
+
+public class TestInstance
+{
+ private static final IterationValue EMPTY_ITERATION_VALUES = new IterationValue();
+
+ private TestConfig _testConfig;
+ private IterationValue _iterationValue;
+ private int _iterationNumber;
+
+ public TestInstance(TestConfig testConfig, int iterationNumber, IterationValue iterationValue)
+ {
+ _testConfig = testConfig;
+ _iterationNumber = iterationNumber;
+ _iterationValue = iterationValue;
+ }
+
+ public TestInstance(TestConfig testConfig)
+ {
+ this(testConfig, 0, EMPTY_ITERATION_VALUES);
+ }
+
+ public List<CommandForClient> createCommands()
+ {
+ List<CommandForClient> commands = _testConfig.createCommands();
+ List<CommandForClient> newCommands = new ArrayList<CommandForClient>(commands.size());
+
+ for (CommandForClient commandForClient : commands)
+ {
+ String clientName = commandForClient.getClientName();
+ Command command = commandForClient.getCommand();
+
+ _iterationValue.applyToCommand(command);
+
+ newCommands.add(new CommandForClient(clientName, command));
+ }
+
+ return newCommands;
+
+ }
+
+ public String getName()
+ {
+ return _testConfig.getName();
+ }
+
+ public int getIterationNumber()
+ {
+ return _iterationNumber;
+ }
+
+ public int getTotalNumberOfParticipants()
+ {
+ return _testConfig.getTotalNumberOfParticipants();
+ }
+
+ public List<QueueConfig> getQueues()
+ {
+ return _testConfig.getQueues();
+ }
+
+ public List<String> getClientNames()
+ {
+ return _testConfig.getClientNames();
+ }
+
+ @Override
+ public String toString()
+ {
+ return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE)
+ .append("testName", getName())
+ .append("iterationNumber", _iterationNumber)
+ .toString();
+ }
+
+}
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/ClientJmsDelegate.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/ClientJmsDelegate.java
new file mode 100644
index 0000000000..a3c2c57473
--- /dev/null
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/ClientJmsDelegate.java
@@ -0,0 +1,575 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.disttest.jms;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.naming.Context;
+import javax.naming.NamingException;
+
+import org.apache.commons.lang.builder.ToStringBuilder;
+import org.apache.commons.lang.builder.ToStringStyle;
+import org.apache.qpid.disttest.DistributedTestConstants;
+import org.apache.qpid.disttest.DistributedTestException;
+import org.apache.qpid.disttest.client.Client;
+import org.apache.qpid.disttest.client.MessageProvider;
+import org.apache.qpid.disttest.message.Command;
+import org.apache.qpid.disttest.message.CreateConnectionCommand;
+import org.apache.qpid.disttest.message.CreateConsumerCommand;
+import org.apache.qpid.disttest.message.CreateMessageProviderCommand;
+import org.apache.qpid.disttest.message.CreateProducerCommand;
+import org.apache.qpid.disttest.message.CreateSessionCommand;
+import org.apache.qpid.disttest.message.RegisterClientCommand;
+import org.apache.qpid.disttest.message.Response;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ClientJmsDelegate
+{
+ private static final Logger LOGGER = LoggerFactory.getLogger(ClientJmsDelegate.class);
+
+ private final Context _context;
+ private final Destination _controllerQueue;
+ private final Connection _controllerConnection;
+ private final Session _controllerSession;
+ private final MessageProducer _controlQueueProducer;
+
+ private final String _clientName;
+ private Queue _instructionQueue;
+
+ private Map<String, Connection> _testConnections;
+ private Map<String, Session> _testSessions;
+ private Map<String, MessageProducer> _testProducers;
+ private Map<String, MessageConsumer> _testConsumers;
+ private Map<String, MessageProvider> _testMessageProviders;
+
+ private final MessageProvider _defaultMessageProvider;
+
+ public ClientJmsDelegate(final Context context)
+ {
+ try
+ {
+ _context = context;
+ final ConnectionFactory connectionFactory = (ConnectionFactory) _context.lookup("connectionfactory");
+ _controllerConnection = connectionFactory.createConnection();
+ _controllerConnection.start();
+ _controllerQueue = (Destination) context.lookup(DistributedTestConstants.CONTROLLER_QUEUE_JNDI_NAME);
+ _controllerSession = _controllerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ _controlQueueProducer = _controllerSession.createProducer(_controllerQueue);
+ _clientName = UUID.randomUUID().toString();
+ _testConnections = new HashMap<String, Connection>();
+ _testSessions = new HashMap<String, Session>();
+ _testProducers = new HashMap<String, MessageProducer>();
+ _testConsumers = new HashMap<String, MessageConsumer>();
+ _testMessageProviders = new HashMap<String, MessageProvider>();
+ _defaultMessageProvider = new MessageProvider(null);
+ }
+ catch (final NamingException ne)
+ {
+ throw new DistributedTestException("Unable to create client jms delegate", ne);
+ }
+ catch (final JMSException jmse)
+ {
+ throw new DistributedTestException("Unable to create client jms delegate", jmse);
+ }
+ }
+
+ public void setInstructionListener(final Client client)
+ {
+ try
+ {
+ _instructionQueue = _controllerSession.createTemporaryQueue();
+ final MessageConsumer instructionConsumer = _controllerSession.createConsumer(_instructionQueue);
+ instructionConsumer.setMessageListener(new MessageListener()
+ {
+ @Override
+ public void onMessage(final Message message)
+ {
+ client.processInstruction(JmsMessageAdaptor.messageToCommand(message));
+ }
+ });
+ }
+ catch (final JMSException jmse)
+ {
+ throw new DistributedTestException("Unable to setup instruction listener", jmse);
+ }
+ }
+
+ public void sendRegistrationMessage()
+ {
+ Command command;
+ try
+ {
+ command = new RegisterClientCommand(_clientName, _instructionQueue.getQueueName());
+ }
+ catch (final JMSException e)
+ {
+ throw new DistributedTestException(e);
+ }
+ sendCommand(command);
+ }
+
+ public void sendResponseMessage(final Response responseMessage)
+ {
+ sendCommand(responseMessage);
+ }
+
+ private void sendCommand(final Command command)
+ {
+ try
+ {
+ final Message message = JmsMessageAdaptor.commandToMessage(_controllerSession, command);
+ _controlQueueProducer.send(message);
+ LOGGER.debug("Sent message for " + command.getType() + ". message id: " + message.getJMSMessageID());
+ }
+ catch (final JMSException jmse)
+ {
+ throw new DistributedTestException("Unable to send command: " + command, jmse);
+ }
+ }
+
+ public void createConnection(final CreateConnectionCommand command)
+ {
+ try
+ {
+ final ConnectionFactory connectionFactory = (ConnectionFactory) _context.lookup(command
+ .getConnectionFactoryName());
+ final Connection newConnection = connectionFactory.createConnection();
+ addConnection(command.getConnectionName(), newConnection);
+ }
+ catch (final NamingException ne)
+ {
+ throw new DistributedTestException("Unable to lookup factoryName: " + command.getConnectionFactoryName(),
+ ne);
+ }
+ catch (final JMSException jmse)
+ {
+ throw new DistributedTestException("Unable to create connection: " + command.getConnectionName()
+ + " (using factory name: " + command.getConnectionFactoryName() + ")", jmse);
+ }
+ }
+
+ public void createSession(final CreateSessionCommand command)
+ {
+ try
+ {
+ final Connection connection = _testConnections.get(command.getConnectionName());
+ if (connection == null)
+ {
+ throw new DistributedTestException("No test connection found called: " + command.getConnectionName(),
+ command);
+ }
+ final boolean transacted = command.getAcknowledgeMode() == Session.SESSION_TRANSACTED;
+ final Session newSession = connection.createSession(transacted, command.getAcknowledgeMode());
+ addSession(command.getSessionName(), newSession);
+ }
+ catch (final JMSException jmse)
+ {
+ throw new DistributedTestException("Unable to create new session: " + command, jmse);
+ }
+ }
+
+ public void createProducer(final CreateProducerCommand command)
+ {
+ try
+ {
+ final Session session = _testSessions.get(command.getSessionName());
+ if (session == null)
+ {
+ throw new DistributedTestException("No test session found called: " + command.getSessionName(), command);
+ }
+ final Destination destination = session.createQueue(command.getDestinationName());
+ final MessageProducer jmsProducer = session.createProducer(destination);
+ if (command.getPriority() != -1)
+ {
+ jmsProducer.setPriority(command.getPriority());
+ }
+ if (command.getTimeToLive() > 0)
+ {
+ jmsProducer.setTimeToLive(command.getTimeToLive());
+ }
+
+ if (command.getDeliveryMode() == DeliveryMode.NON_PERSISTENT
+ || command.getDeliveryMode() == DeliveryMode.PERSISTENT)
+ {
+ jmsProducer.setDeliveryMode(command.getDeliveryMode());
+ }
+
+ addProducer(command.getParticipantName(), jmsProducer);
+ }
+ catch (final JMSException jmse)
+ {
+ throw new DistributedTestException("Unable to create new producer: " + command, jmse);
+ }
+
+ }
+
+ public void createConsumer(final CreateConsumerCommand command)
+ {
+ try
+ {
+ final Session session = _testSessions.get(command.getSessionName());
+ if (session == null)
+ {
+ throw new DistributedTestException("No test session found called: " + command.getSessionName(), command);
+ }
+ final Destination destination = command.isTopic() ? session.createTopic(command.getDestinationName())
+ : session.createQueue(command.getDestinationName());
+ final MessageConsumer jmsConsumer = session.createConsumer(destination, command.getSelector());
+
+ _testConsumers.put(command.getParticipantName(), jmsConsumer);
+ }
+ catch (final JMSException jmse)
+ {
+ throw new DistributedTestException("Unable to create new consumer: " + command, jmse);
+ }
+ }
+
+ /**
+ * destroy the client. Don't call from the Dispatcher thread.
+ */
+ public void destroy()
+ {
+ try
+ {
+ // Stopping the connection allows in-flight onMessage calls to
+ // finish.
+ _controllerConnection.stop();
+
+ if (_controllerSession != null)
+ {
+ _controllerSession.close();
+ }
+ if (_controllerConnection != null)
+ {
+ _controllerConnection.close();
+ }
+
+ }
+ catch (final JMSException jmse)
+ {
+ throw new DistributedTestException("Unable to destroy cleanly", jmse);
+ }
+ }
+
+ public Destination getControllerQueue()
+ {
+ return _controllerQueue;
+ }
+
+ public String getClientName()
+ {
+ return _clientName;
+ }
+
+ public int getNoOfTestConnections()
+ {
+ return _testConnections.size();
+ }
+
+ public int getNoOfTestSessions()
+ {
+ return _testSessions.size();
+ }
+
+ public int getNoOfTestProducers()
+ {
+ return _testProducers.size();
+ }
+
+ public int getNoOfTestConsumers()
+ {
+ return _testConsumers.size();
+ }
+
+ public void startConnections()
+ {
+ // start connections for consumers
+ // it would be better if we could track consumer connections and start
+ // only those
+ if (!_testConsumers.isEmpty())
+ {
+ for (final Map.Entry<String, Connection> entry : _testConnections.entrySet())
+ {
+ final Connection connection = entry.getValue();
+ try
+ {
+ connection.start();
+ }
+ catch (final JMSException e)
+ {
+ throw new DistributedTestException("Failed to start connection '" + entry.getKey() + "' :"
+ + e.getLocalizedMessage());
+ }
+ }
+ }
+ }
+
+ public void commitOrAcknowledgeMessage(final Message message, final String sessionName)
+ {
+ try
+ {
+ final Session session = _testSessions.get(sessionName);
+ if (session.getTransacted())
+ {
+ session.commit();
+ }
+ else if (message != null && session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE)
+ {
+ message.acknowledge();
+ }
+ }
+ catch (final JMSException jmse)
+ {
+ throw new DistributedTestException("Unable to commit or acknowledge message on session: " +
+ sessionName, jmse);
+ }
+ }
+
+ public Message sendNextMessage(final CreateProducerCommand command)
+ {
+ Message sentMessage = null;
+ MessageProvider messageProvider = _testMessageProviders.get(command.getMessageProviderName());
+ if (messageProvider == null)
+ {
+ messageProvider = _defaultMessageProvider;
+ }
+
+ final Session session = _testSessions.get(command.getSessionName());
+ final MessageProducer producer = _testProducers.get(command.getParticipantName());
+ try
+ {
+ sentMessage = messageProvider.nextMessage(session, command);
+ int deliveryMode = producer.getDeliveryMode();
+ int priority = producer.getPriority();
+ long ttl = producer.getTimeToLive();
+ if (messageProvider.isPropertySet(MessageProvider.PRIORITY))
+ {
+ priority = sentMessage.getJMSPriority();
+ }
+ if (messageProvider.isPropertySet(MessageProvider.DELIVERY_MODE))
+ {
+ deliveryMode = sentMessage.getJMSDeliveryMode();
+ }
+ if (messageProvider.isPropertySet(MessageProvider.TTL))
+ {
+ ttl = sentMessage.getLongProperty(MessageProvider.TTL);
+ }
+ producer.send(sentMessage, deliveryMode, priority, ttl);
+ }
+ catch (final JMSException jmse)
+ {
+ throw new DistributedTestException("Unable to create and send message with producer: " +
+ command.getParticipantName() + " on session: " + command.getSessionName(), jmse);
+ }
+ return sentMessage;
+ }
+
+ protected void addSession(final String sessionName, final Session newSession)
+ {
+ _testSessions.put(sessionName, newSession);
+ }
+
+ protected void addConnection(final String connectionName, final Connection newConnection)
+ {
+ _testConnections.put(connectionName, newConnection);
+ }
+
+ protected void addProducer(final String producerName, final MessageProducer jmsProducer)
+ {
+ _testProducers.put(producerName, jmsProducer);
+ }
+
+ public Message consumeMessage(String consumerName, long receiveInterval)
+ {
+ Message consumedMessage = null;
+ MessageConsumer consumer = _testConsumers.get(consumerName);
+ try
+ {
+ consumedMessage = consumer.receive(receiveInterval);
+ }
+ catch (JMSException e)
+ {
+ throw new DistributedTestException("Unable to consume message with consumer: " + consumerName, e);
+ }
+ return consumedMessage;
+ }
+
+ public void registerListener(String consumerName, MessageListener messageListener)
+ {
+ MessageConsumer consumer = _testConsumers.get(consumerName);
+ try
+ {
+ consumer.setMessageListener(messageListener);
+ }
+ catch (JMSException e)
+ {
+ throw new DistributedTestException("Unable to register message listener with consumer: " + consumerName, e);
+ }
+ }
+
+ public void rollbackOrRecover(String sessionName)
+ {
+ try
+ {
+ final Session session = _testSessions.get(sessionName);
+ if (session.getTransacted())
+ {
+ session.rollback();
+ }
+ else if (session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE)
+ {
+ session.recover();
+ }
+ }
+ catch (final JMSException jmse)
+ {
+ throw new DistributedTestException("Unable to rollback or recover on session: " +
+ sessionName, jmse);
+ }
+ }
+
+ public void releaseMessage(String sessionName)
+ {
+ try
+ {
+ final Session session = _testSessions.get(sessionName);
+ if (session.getTransacted())
+ {
+ session.rollback();
+ }
+ else
+ {
+ session.recover();
+ }
+ }
+ catch (final JMSException jmse)
+ {
+ LOGGER.warn("Unable to rollback or recover on session: " + sessionName, jmse);
+ }
+ }
+
+ @Override
+ public String toString()
+ {
+ return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE).append("clientName", _clientName).toString();
+ }
+
+ public void closeTestConnections()
+ {
+ StringBuilder jmsErrorMessages = new StringBuilder();
+ int failedCloseCounter = 0;
+ for (final Map.Entry<String, Connection> entry : _testConnections.entrySet())
+ {
+ final Connection connection = entry.getValue();
+ try
+ {
+ connection.close();
+ }
+ catch (final JMSException e)
+ {
+ LOGGER.error("Failed to close connection '" + entry.getKey() + "' :" + e.getLocalizedMessage(), e);
+ failedCloseCounter++;
+ if (jmsErrorMessages.length() > 0)
+ {
+ jmsErrorMessages.append('\n');
+ }
+ jmsErrorMessages.append(e.getMessage());
+ }
+ }
+ _testConnections.clear();
+ _testSessions.clear();
+ _testProducers.clear();
+ _testConsumers.clear();
+ if (failedCloseCounter > 0)
+ {
+ throw new DistributedTestException("Close failed for " + failedCloseCounter + " connection(s) with the following errors: " + jmsErrorMessages.toString());
+ }
+ }
+
+ public void closeTestConsumer(String consumerName)
+ {
+ MessageConsumer consumer = _testConsumers.get(consumerName);
+ if (consumer != null)
+ {
+ try
+ {
+ consumer.close();
+ LOGGER.info("Closed test consumer " + consumerName);
+ }
+ catch (JMSException e)
+ {
+ throw new DistributedTestException("Failed to close consumer: " + consumerName, e);
+ }
+ }
+ }
+
+ public void closeTestProducer(String producerName)
+ {
+ MessageProducer producer = _testProducers.get(producerName);
+ if (producer != null)
+ {
+ try
+ {
+ producer.close();
+ }
+ catch (JMSException e)
+ {
+ throw new DistributedTestException("Failed to close producer: " + producerName, e);
+ }
+ }
+ }
+
+ public int calculatePayloadSizeFrom(Message message)
+ {
+ try
+ {
+ if (message != null && message instanceof TextMessage)
+ {
+ return ((TextMessage) message).getText().getBytes().length;
+ }
+ // TODO support other message types
+ return 0;
+ }
+ catch (JMSException e)
+ {
+ throw new DistributedTestException("Unable to determine the payload size for message " + message, e);
+ }
+ }
+
+ public void createMessageProvider(CreateMessageProviderCommand command)
+ {
+ _testMessageProviders.put(command.getProviderName(), new MessageProvider(command.getMessageProperties()));
+ }
+}
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/ControllerJmsDelegate.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/ControllerJmsDelegate.java
new file mode 100644
index 0000000000..3bf8e6ff4a
--- /dev/null
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/ControllerJmsDelegate.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.disttest.jms;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.naming.Context;
+import javax.naming.NamingException;
+
+import org.apache.qpid.disttest.DistributedTestException;
+import org.apache.qpid.disttest.controller.CommandListener;
+import org.apache.qpid.disttest.controller.config.QueueConfig;
+import org.apache.qpid.disttest.message.Command;
+import org.apache.qpid.disttest.message.RegisterClientCommand;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ControllerJmsDelegate
+{
+ private static final Logger LOGGER = LoggerFactory.getLogger(ControllerJmsDelegate.class);
+
+ private final Map<String, Destination> _clientNameToQueueMap = new ConcurrentHashMap<String, Destination>();
+ private final Connection _connection;
+ private final Destination _controllerQueue;
+ private final Session _session;
+ private final QueueCreator _queueCreator;
+
+ private List<CommandListener> _commandListeners = new CopyOnWriteArrayList<CommandListener>();
+
+ public ControllerJmsDelegate(final Context context) throws NamingException, JMSException
+ {
+ final ConnectionFactory connectionFactory = (ConnectionFactory) context.lookup("connectionfactory");
+ _connection = connectionFactory.createConnection();
+ _connection.start();
+ _controllerQueue = (Destination) context.lookup("controllerqueue");
+ _session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ _queueCreator = new QpidQueueCreator();
+ }
+
+ public void start()
+ {
+ try
+ {
+ final MessageConsumer consumer = _session.createConsumer(_controllerQueue);
+ consumer.setMessageListener(new MessageListener()
+ {
+ @Override
+ public void onMessage(final Message message)
+ {
+ try
+ {
+ String jmsMessageID = message.getJMSMessageID();
+ LOGGER.debug("Received message " + jmsMessageID);
+
+ final Command command = JmsMessageAdaptor.messageToCommand(message);
+ LOGGER.debug("Converted message " + jmsMessageID + " into command: " + command);
+
+ processCommandWithFirstSupportingListener(command);
+ LOGGER.debug("Finished processing command for message " + jmsMessageID);
+ }
+ catch (Throwable t)
+ {
+ LOGGER.error("Can't handle JMS message", t);
+ }
+ }
+ });
+ }
+ catch (final JMSException e)
+ {
+ throw new DistributedTestException(e);
+ }
+ }
+
+ /** ensures connections are closed, otherwise the JVM may be prevented from terminating */
+ public void closeConnections()
+ {
+ try
+ {
+ _session.close();
+ }
+ catch (JMSException e)
+ {
+ LOGGER.error("Unable to close session", e);
+ }
+
+ try
+ {
+ _connection.stop();
+ }
+ catch (JMSException e)
+ {
+ LOGGER.error("Unable to stop connection", e);
+ }
+
+ try
+ {
+ _connection.close();
+ }
+ catch (JMSException e)
+ {
+ throw new DistributedTestException("Unable to close connection", e);
+ }
+ }
+
+ public void registerClient(final RegisterClientCommand command)
+ {
+ final String clientName = command.getClientName();
+ final Destination clientIntructionQueue = createDestinationFromString(command.getClientQueueName());
+ _clientNameToQueueMap.put(clientName, clientIntructionQueue);
+ }
+
+ public void sendCommandToClient(final String clientName, final Command command)
+ {
+ final Destination clientQueue = _clientNameToQueueMap.get(clientName);
+ if (clientQueue == null)
+ {
+ throw new DistributedTestException("Client name " + clientName + " not known. I know about: "
+ + _clientNameToQueueMap.keySet());
+ }
+
+ try
+ {
+ final MessageProducer producer = _session.createProducer(clientQueue);
+ final Message message = JmsMessageAdaptor.commandToMessage(_session, command);
+
+ producer.send(message);
+ }
+ catch (final JMSException e)
+ {
+ throw new DistributedTestException(e);
+ }
+ }
+
+ private void processCommandWithFirstSupportingListener(Command command)
+ {
+ for (CommandListener listener : _commandListeners)
+ {
+ if (listener.supports(command))
+ {
+ listener.processCommand(command);
+ return;
+ }
+ }
+
+ throw new IllegalStateException("There is no registered listener to process command " + command);
+ }
+
+ private Destination createDestinationFromString(final String clientQueueName)
+ {
+ Destination clientIntructionQueue;
+ try
+ {
+ clientIntructionQueue = _session.createQueue(clientQueueName);
+ }
+ catch (JMSException e)
+ {
+ throw new DistributedTestException("Unable to create Destination from " + clientQueueName);
+ }
+ return clientIntructionQueue;
+ }
+
+ public void createQueues(List<QueueConfig> queues)
+ {
+ _queueCreator.createQueues(_connection, queues);
+ }
+
+ public void deleteQueues(List<QueueConfig> queues)
+ {
+ _queueCreator.deleteQueues(_connection, queues);
+ }
+
+ public void addCommandListener(CommandListener commandListener)
+ {
+ _commandListeners.add(commandListener);
+ }
+
+ public void removeCommandListener(CommandListener commandListener)
+ {
+ _commandListeners.remove(commandListener);
+ }
+}
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/JmsMessageAdaptor.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/JmsMessageAdaptor.java
new file mode 100644
index 0000000000..c9dba21a74
--- /dev/null
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/JmsMessageAdaptor.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.disttest.jms;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.Session;
+
+import org.apache.qpid.disttest.DistributedTestConstants;
+import org.apache.qpid.disttest.DistributedTestException;
+import org.apache.qpid.disttest.json.JsonHandler;
+import org.apache.qpid.disttest.message.Command;
+import org.apache.qpid.disttest.message.CommandType;
+import org.apache.qpid.disttest.message.ConsumerParticipantResult;
+import org.apache.qpid.disttest.message.CreateConnectionCommand;
+import org.apache.qpid.disttest.message.CreateConsumerCommand;
+import org.apache.qpid.disttest.message.CreateMessageProviderCommand;
+import org.apache.qpid.disttest.message.CreateProducerCommand;
+import org.apache.qpid.disttest.message.CreateResponderCommand;
+import org.apache.qpid.disttest.message.CreateSessionCommand;
+import org.apache.qpid.disttest.message.NoOpCommand;
+import org.apache.qpid.disttest.message.ParticipantResult;
+import org.apache.qpid.disttest.message.ProducerParticipantResult;
+import org.apache.qpid.disttest.message.RegisterClientCommand;
+import org.apache.qpid.disttest.message.Response;
+import org.apache.qpid.disttest.message.StartTestCommand;
+import org.apache.qpid.disttest.message.StopClientCommand;
+import org.apache.qpid.disttest.message.TearDownTestCommand;
+
+public class JmsMessageAdaptor
+{
+ public static Message commandToMessage(final Session session, final Command command)
+ {
+ Message jmsMessage = null;
+ try
+ {
+ jmsMessage = session.createMessage();
+ jmsMessage.setStringProperty(DistributedTestConstants.MSG_COMMAND_PROPERTY, command.getType().name());
+ final JsonHandler jsonHandler = new JsonHandler();
+ jmsMessage.setStringProperty(DistributedTestConstants.MSG_JSON_PROPERTY, jsonHandler.marshall(command));
+ }
+ catch (final JMSException jmse)
+ {
+ throw new DistributedTestException("Unable to convert command " + command + " to JMS Message", jmse);
+ }
+
+ return jmsMessage;
+ }
+
+ public static Command messageToCommand(final Message jmsMessage)
+ {
+ Command command = null;
+ try
+ {
+ final CommandType commandType = CommandType.valueOf(jmsMessage
+ .getStringProperty(DistributedTestConstants.MSG_COMMAND_PROPERTY));
+ final JsonHandler jsonHandler = new JsonHandler();
+ command = jsonHandler.unmarshall(jmsMessage.getStringProperty(DistributedTestConstants.MSG_JSON_PROPERTY),
+ getCommandClassFromType(commandType));
+ }
+ catch (final JMSException jmse)
+ {
+ throw new DistributedTestException("Unable to convert JMS message " + jmsMessage + " to command object",
+ jmse);
+ }
+ return command;
+ }
+
+ static Class<? extends Command> getCommandClassFromType(final CommandType type)
+ {
+ switch (type)
+ {
+ case CREATE_CONNECTION:
+ return CreateConnectionCommand.class;
+ case CREATE_SESSION:
+ return CreateSessionCommand.class;
+ case CREATE_PRODUCER:
+ return CreateProducerCommand.class;
+ case CREATE_CONSUMER:
+ return CreateConsumerCommand.class;
+ case CREATE_RESPONDER:
+ return CreateResponderCommand.class;
+ case NO_OP:
+ return NoOpCommand.class;
+ case REGISTER_CLIENT:
+ return RegisterClientCommand.class;
+ case STOP_CLIENT:
+ return StopClientCommand.class;
+ case RESPONSE:
+ return Response.class;
+ case START_TEST:
+ return StartTestCommand.class;
+ case TEAR_DOWN_TEST:
+ return TearDownTestCommand.class;
+ case PARTICIPANT_RESULT:
+ return ParticipantResult.class;
+ case CONSUMER_PARTICIPANT_RESULT:
+ return ConsumerParticipantResult.class;
+ case PRODUCER_PARTICIPANT_RESULT:
+ return ProducerParticipantResult.class;
+ case CREATE_MESSAGE_PROVIDER:
+ return CreateMessageProviderCommand.class;
+ default:
+ throw new DistributedTestException("No class defined for type: " + type);
+ }
+ }
+}
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/QpidQueueCreator.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/QpidQueueCreator.java
new file mode 100644
index 0000000000..6b21181b2a
--- /dev/null
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/QpidQueueCreator.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.disttest.jms;
+
+import java.util.List;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Session;
+
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.disttest.DistributedTestException;
+import org.apache.qpid.disttest.controller.config.QueueConfig;
+import org.apache.qpid.framing.FieldTable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class QpidQueueCreator implements QueueCreator
+{
+ private static final Logger LOGGER = LoggerFactory.getLogger(QpidQueueCreator.class);
+
+ private static final FieldTable EMPTY_QUEUE_BIND_ARGUMENTS = new FieldTable();
+
+ @Override
+ public void createQueues(Connection connection, List<QueueConfig> configs)
+ {
+ AMQSession<?, ?> session = createSession(connection);
+ for (QueueConfig queueConfig : configs)
+ {
+ createQueue(session, queueConfig);
+ }
+ }
+
+ @Override
+ public void deleteQueues(Connection connection, List<QueueConfig> configs)
+ {
+ AMQSession<?, ?> session = createSession(connection);
+ for (QueueConfig queueConfig : configs)
+ {
+ deleteQueue(session, queueConfig);
+ }
+ }
+
+ private void createQueue(AMQSession<?, ?> session, QueueConfig queueConfig)
+ {
+ try
+ {
+ AMQDestination destination = (AMQDestination) session.createQueue(queueConfig.getName());
+ boolean autoDelete = false;
+ boolean exclusive = false;
+ session.createQueue(destination.getAMQQueueName(), autoDelete,
+ queueConfig.isDurable(), exclusive, queueConfig.getAttributes());
+ session.bindQueue(destination.getAMQQueueName(), destination.getRoutingKey(),
+ EMPTY_QUEUE_BIND_ARGUMENTS, destination.getExchangeName(),
+ destination, autoDelete);
+
+ LOGGER.info("Created queue " + queueConfig);
+ }
+ catch (Exception e)
+ {
+ throw new DistributedTestException("Failed to create queue:" + queueConfig, e);
+ }
+ }
+
+ private void deleteQueue(AMQSession<?, ?> session, QueueConfig queueConfig)
+ {
+ try
+ {
+ // The Qpid AMQSession API currently makes the #deleteQueue method protected and the
+ // raw protocol method public. This should be changed then we should switch the below to
+ // use #deleteQueue.
+ AMQDestination destination = (AMQDestination) session.createQueue(queueConfig.getName());
+ session.sendQueueDelete(destination.getAMQQueueName());
+ LOGGER.info("Deleted queue " + queueConfig.getName());
+ }
+ catch (Exception e)
+ {
+ throw new DistributedTestException("Failed to delete queue:" + queueConfig.getName(), e);
+ }
+ }
+
+ private AMQSession<?, ?> createSession(Connection connection)
+ {
+ try
+ {
+ return (AMQSession<?, ?>) connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ }
+ catch (JMSException e)
+ {
+ throw new DistributedTestException("Failed to create session!", e);
+ }
+ }
+}
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/QueueCreator.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/QueueCreator.java
new file mode 100644
index 0000000000..dd9faef39f
--- /dev/null
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/QueueCreator.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.disttest.jms;
+
+import java.util.List;
+
+import javax.jms.Connection;
+
+import org.apache.qpid.disttest.controller.config.QueueConfig;
+
+public interface QueueCreator
+{
+ public void createQueues(final Connection connection, final List<QueueConfig> configs);
+ public void deleteQueues(final Connection connection, final List<QueueConfig> configs);
+}
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/json/JsonHandler.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/json/JsonHandler.java
new file mode 100644
index 0000000000..8e50cd4f11
--- /dev/null
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/json/JsonHandler.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.disttest.json;
+
+import org.apache.qpid.disttest.client.property.PropertyValue;
+import org.apache.qpid.disttest.message.Command;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+
+public class JsonHandler
+{
+ private final Gson _gson = new GsonBuilder()
+ .registerTypeAdapter(PropertyValue.class, new PropertyValueAdapter())
+ .create();
+
+ public <T extends Command> T unmarshall(final String jsonParams, final Class<T> clazz)
+ {
+ return _gson.fromJson(jsonParams, clazz);
+ }
+
+ public <T extends Command> String marshall(final T command)
+ {
+ return _gson.toJson(command);
+ }
+}
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/json/PropertyValueAdapter.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/json/PropertyValueAdapter.java
new file mode 100644
index 0000000000..94f712e652
--- /dev/null
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/json/PropertyValueAdapter.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.disttest.json;
+
+import java.lang.reflect.Type;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.qpid.disttest.client.property.GeneratedPropertyValue;
+import org.apache.qpid.disttest.client.property.PropertyValue;
+import org.apache.qpid.disttest.client.property.PropertyValueFactory;
+import org.apache.qpid.disttest.client.property.SimplePropertyValue;
+
+import com.google.gson.JsonArray;
+import com.google.gson.JsonDeserializationContext;
+import com.google.gson.JsonDeserializer;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParseException;
+import com.google.gson.JsonPrimitive;
+import com.google.gson.JsonSerializationContext;
+import com.google.gson.JsonSerializer;
+
+public class PropertyValueAdapter implements JsonDeserializer<PropertyValue>, JsonSerializer<PropertyValue>
+{
+ private static final String DEF_FIELD = "@def";
+ private PropertyValueFactory _factory = new PropertyValueFactory();
+
+ @Override
+ public PropertyValue deserialize(JsonElement json, Type type, JsonDeserializationContext context)
+ throws JsonParseException
+ {
+ if (json.isJsonNull())
+ {
+ return null;
+ }
+ else if (json.isJsonPrimitive())
+ {
+ Object result = null;
+ JsonPrimitive primitive = json.getAsJsonPrimitive();
+ if (primitive.isString())
+ {
+ result = primitive.getAsString();
+ }
+ else if (primitive.isNumber())
+ {
+ String asString = primitive.getAsString();
+ if (asString.indexOf('.') != -1 || asString.indexOf('e') != -1)
+ {
+ result = primitive.getAsDouble();
+ }
+ else
+ {
+ result = primitive.getAsLong();
+ }
+ }
+ else if (primitive.isBoolean())
+ {
+ result = primitive.getAsBoolean();
+ }
+ else
+ {
+ throw new JsonParseException("Unsupported primitive value " + primitive);
+ }
+ return new SimplePropertyValue(result);
+ }
+ else if (json.isJsonArray())
+ {
+ JsonArray array = json.getAsJsonArray();
+ List<Object> result = new ArrayList<Object>(array.size());
+ for (JsonElement element : array)
+ {
+ result.add(context.deserialize(element, Object.class));
+ }
+ return new SimplePropertyValue(result);
+ }
+ else if (json.isJsonObject())
+ {
+ JsonObject object = json.getAsJsonObject();
+ JsonElement defElement = object.getAsJsonPrimitive(DEF_FIELD);
+ Class<?> classInstance = null;
+ if (defElement != null)
+ {
+ try
+ {
+ classInstance = _factory.getPropertyValueClass(defElement.getAsString());
+ }
+ catch (ClassNotFoundException e)
+ {
+ // ignore
+ }
+ }
+ if (classInstance == null)
+ {
+ Map<String, Object> result = new HashMap<String, Object>();
+ for (Map.Entry<String, JsonElement> entry : object.entrySet())
+ {
+ Object value = context.deserialize(entry.getValue(), Object.class);
+ result.put(entry.getKey(), value);
+ }
+ return new SimplePropertyValue(result);
+ }
+ else
+ {
+ return context.deserialize(json, classInstance);
+ }
+ }
+ else
+ {
+ throw new JsonParseException("Unsupported JSON type " + json);
+ }
+ }
+
+ @Override
+ public JsonElement serialize(PropertyValue src, Type typeOfSrc, JsonSerializationContext context)
+ {
+ if (src instanceof GeneratedPropertyValue)
+ {
+ JsonObject object = (JsonObject) context.serialize(src, Object.class);
+ object.addProperty(DEF_FIELD, ((GeneratedPropertyValue) src).getDefinition());
+ return object;
+ }
+ else
+ {
+ return context.serialize(src.getValue(), Object.class);
+ }
+ }
+
+}
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/message/Command.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/message/Command.java
new file mode 100644
index 0000000000..86b4d0e439
--- /dev/null
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/message/Command.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.disttest.message;
+
+
+import org.apache.commons.lang.builder.ReflectionToStringBuilder;
+import org.apache.commons.lang.builder.ToStringStyle;
+import org.apache.qpid.disttest.Visitor;
+import org.apache.qpid.disttest.client.Client;
+import org.apache.qpid.disttest.controller.Controller;
+
+/**
+ * A command sent between the {@link Controller} and a {@link Client}
+ */
+public abstract class Command
+{
+ private final CommandType type;
+
+ public Command(final CommandType type)
+ {
+ this.type = type;
+ }
+
+ public CommandType getType()
+ {
+ return type;
+ }
+
+ public void accept(Visitor visitor)
+ {
+ visitor.visit(this);
+ }
+
+ @Override
+ public String toString()
+ {
+ return ReflectionToStringBuilder.toString(this, ToStringStyle.SHORT_PREFIX_STYLE);
+ }
+}
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/message/CommandType.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/message/CommandType.java
new file mode 100644
index 0000000000..b04cbdaba1
--- /dev/null
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/message/CommandType.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.disttest.message;
+
+public enum CommandType
+{
+ CREATE_CONNECTION,
+ CREATE_CONSUMER,
+ CREATE_PRODUCER,
+ CREATE_RESPONDER,
+ CREATE_SESSION,
+ NO_OP,
+ REGISTER_CLIENT,
+ RESPONSE,
+ START_TEST,
+ STOP_CLIENT,
+ TEAR_DOWN_TEST,
+ PARTICIPANT_RESULT,
+ CONSUMER_PARTICIPANT_RESULT,
+ PRODUCER_PARTICIPANT_RESULT,
+ CREATE_MESSAGE_PROVIDER
+}
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/message/ConsumerParticipantResult.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/message/ConsumerParticipantResult.java
new file mode 100644
index 0000000000..f92e3ea538
--- /dev/null
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/message/ConsumerParticipantResult.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.disttest.message;
+
+import static org.apache.qpid.disttest.message.ParticipantAttribute.IS_BROWSIING_SUBSCRIPTION;
+import static org.apache.qpid.disttest.message.ParticipantAttribute.IS_DURABLE_SUBSCRIPTION;
+import static org.apache.qpid.disttest.message.ParticipantAttribute.IS_NO_LOCAL;
+import static org.apache.qpid.disttest.message.ParticipantAttribute.IS_SELECTOR;
+import static org.apache.qpid.disttest.message.ParticipantAttribute.IS_SYNCHRONOUS_CONSUMER;
+import static org.apache.qpid.disttest.message.ParticipantAttribute.IS_TOPIC;
+
+public class ConsumerParticipantResult extends ParticipantResult
+{
+ private boolean _topic;
+ private boolean _durableSubscription;
+ private boolean _browsingSubscription;
+ private boolean _selector;
+ private boolean _noLocal;
+ private boolean _synchronousConsumer;
+
+ public ConsumerParticipantResult()
+ {
+ super(CommandType.CONSUMER_PARTICIPANT_RESULT);
+ }
+
+ public ConsumerParticipantResult(String participantName)
+ {
+ this();
+ setParticipantName(participantName);
+ }
+
+ @OutputAttribute(attribute=IS_DURABLE_SUBSCRIPTION)
+ public boolean isDurableSubscription()
+ {
+ return _durableSubscription;
+ }
+
+ public void setDurableSubscription(boolean durable)
+ {
+ _durableSubscription = durable;
+ }
+
+
+ @OutputAttribute(attribute=IS_BROWSIING_SUBSCRIPTION)
+ public boolean isBrowsingSubscription()
+ {
+ return _browsingSubscription;
+ }
+
+ public void setBrowsingSubscription(boolean browsingSubscription)
+ {
+ _browsingSubscription = browsingSubscription;
+ }
+
+
+ @OutputAttribute(attribute=IS_SELECTOR)
+ public boolean isSelector()
+ {
+ return _selector;
+ }
+
+ public void setSelector(boolean selector)
+ {
+ _selector = selector;
+ }
+
+
+ @OutputAttribute(attribute=IS_NO_LOCAL)
+ public boolean isNoLocal()
+ {
+ return _noLocal;
+
+ }
+
+ public void setNoLocal(boolean noLocal)
+ {
+ _noLocal = noLocal;
+ }
+
+ @OutputAttribute(attribute=IS_SYNCHRONOUS_CONSUMER)
+ public boolean isSynchronousConsumer()
+ {
+ return _synchronousConsumer;
+ }
+
+ public void setSynchronousConsumer(boolean synchronousConsumer)
+ {
+ _synchronousConsumer = synchronousConsumer;
+ }
+
+
+ public void setTopic(boolean isTopic)
+ {
+ _topic = isTopic;
+ }
+
+ @OutputAttribute(attribute=IS_TOPIC)
+ public boolean isTopic()
+ {
+ return _topic;
+ }
+}
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/message/CreateConnectionCommand.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/message/CreateConnectionCommand.java
new file mode 100644
index 0000000000..c5a96e9a94
--- /dev/null
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/message/CreateConnectionCommand.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.disttest.message;
+
+public class CreateConnectionCommand extends Command
+{
+ private String _connectionName;
+ private String _connectionFactoryName;
+
+ public CreateConnectionCommand()
+ {
+ super(CommandType.CREATE_CONNECTION);
+ }
+
+ public CreateConnectionCommand(String connectionName, String connectionFactoryName)
+ {
+ super(CommandType.CREATE_CONNECTION);
+ _connectionName = connectionName;
+ _connectionFactoryName = connectionFactoryName;
+ }
+
+ public void setConnectionName(final String connectionName)
+ {
+ this._connectionName = connectionName;
+ }
+
+ public String getConnectionName()
+ {
+ return _connectionName;
+ }
+
+ public void setConnectionFactoryName(final String connectionFactoryName)
+ {
+ this._connectionFactoryName = connectionFactoryName;
+ }
+
+ public String getConnectionFactoryName()
+ {
+ return _connectionFactoryName;
+ }
+}
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/message/CreateConsumerCommand.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/message/CreateConsumerCommand.java
new file mode 100644
index 0000000000..678e428f94
--- /dev/null
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/message/CreateConsumerCommand.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.disttest.message;
+
+public class CreateConsumerCommand extends CreateParticpantCommand
+{
+ private boolean _isTopic;
+ private boolean _isDurableSubscription;
+ private boolean _isBrowsingSubscription;
+ private String _selector;
+ private boolean _noLocal;
+ private boolean _synchronous;
+ private long _receiveTimeout = 5000;
+
+
+ public CreateConsumerCommand()
+ {
+ super(CommandType.CREATE_CONSUMER);
+ }
+
+ public boolean isDurableSubscription()
+ {
+ return _isDurableSubscription;
+ }
+
+ public void setDurableSubscription(final boolean isDurableSubscription)
+ {
+ this._isDurableSubscription = isDurableSubscription;
+ }
+
+ public boolean isBrowsingSubscription()
+ {
+ return _isBrowsingSubscription;
+ }
+
+ public void setBrowsingSubscription(final boolean isBrowsingSubscription)
+ {
+ _isBrowsingSubscription = isBrowsingSubscription;
+ }
+
+ public String getSelector()
+ {
+ return _selector;
+ }
+
+ public void setSelector(final String selector)
+ {
+ this._selector = selector;
+ }
+
+ public boolean isNoLocal()
+ {
+ return _noLocal;
+ }
+
+ public void setNoLocal(final boolean noLocal)
+ {
+ this._noLocal = noLocal;
+ }
+
+ public boolean isTopic()
+ {
+ return _isTopic;
+ }
+
+ public void setTopic(boolean isTopic)
+ {
+ this._isTopic = isTopic;
+ }
+
+ public boolean isSynchronous()
+ {
+ return _synchronous;
+ }
+
+ public void setSynchronous(boolean synchronous)
+ {
+ _synchronous = synchronous;
+ }
+
+ public void setReceiveTimeout(long receiveTimeout)
+ {
+ _receiveTimeout = receiveTimeout;
+
+ }
+
+ public long getReceiveTimeout()
+ {
+ return _receiveTimeout;
+ }
+}
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/message/CreateMessageProviderCommand.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/message/CreateMessageProviderCommand.java
new file mode 100644
index 0000000000..3f30fdd96a
--- /dev/null
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/message/CreateMessageProviderCommand.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.disttest.message;
+
+import java.util.Map;
+
+import org.apache.qpid.disttest.client.property.PropertyValue;
+
+public class CreateMessageProviderCommand extends Command
+{
+ private String _providerName;
+ private Map<String, PropertyValue> _messageProperties;
+
+ public CreateMessageProviderCommand()
+ {
+ super(CommandType.CREATE_MESSAGE_PROVIDER);
+ }
+
+ public String getProviderName()
+ {
+ return _providerName;
+ }
+
+ public void setProviderName(String providerName)
+ {
+ this._providerName = providerName;
+ }
+
+ public Map<String, PropertyValue> getMessageProperties()
+ {
+ return _messageProperties;
+ }
+
+ public void setMessageProperties(Map<String, PropertyValue> messageProperties)
+ {
+ this._messageProperties = messageProperties;
+ }
+}
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/message/CreateParticpantCommand.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/message/CreateParticpantCommand.java
new file mode 100644
index 0000000000..f5d4a3ae9b
--- /dev/null
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/message/CreateParticpantCommand.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.disttest.message;
+
+public abstract class CreateParticpantCommand extends Command
+{
+ private String _participantName;
+ private String _sessionName;
+ private String _destinationName;
+ private long _numberOfMessages;
+ private int _batchSize;
+ private long _maximumDuration;
+
+ public CreateParticpantCommand(CommandType type)
+ {
+ super(type);
+ }
+
+ public String getParticipantName()
+ {
+ return _participantName;
+ }
+
+ public void setParticipantName(final String participantName)
+ {
+ _participantName = participantName;
+ }
+
+ public String getSessionName()
+ {
+ return _sessionName;
+ }
+
+ public void setSessionName(final String sessionName)
+ {
+ _sessionName = sessionName;
+ }
+
+ public String getDestinationName()
+ {
+ return _destinationName;
+ }
+
+ public void setDestinationName(final String destinationName)
+ {
+ _destinationName = destinationName;
+ }
+
+ public long getNumberOfMessages()
+ {
+ return _numberOfMessages;
+ }
+
+ public void setNumberOfMessages(final long numberOfMessages)
+ {
+ _numberOfMessages = numberOfMessages;
+ }
+
+ public int getBatchSize()
+ {
+ return _batchSize;
+ }
+
+ public void setBatchSize(int batchSize)
+ {
+ _batchSize = batchSize;
+ }
+
+ public long getMaximumDuration()
+ {
+ return _maximumDuration;
+ }
+
+ public void setMaximumDuration(long maximumDuration)
+ {
+ _maximumDuration = maximumDuration;
+ }
+
+}
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/message/CreateProducerCommand.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/message/CreateProducerCommand.java
new file mode 100644
index 0000000000..69dfe1ff5a
--- /dev/null
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/message/CreateProducerCommand.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.disttest.message;
+
+public class CreateProducerCommand extends CreateParticpantCommand
+{
+ private int _deliveryMode;
+ private int _messageSize;
+ private int _priority;
+ private long _timeToLive;
+ private long _interval;
+ private long _startDelay;
+ private String _messageProviderName;
+
+ public CreateProducerCommand()
+ {
+ super(CommandType.CREATE_PRODUCER);
+ }
+
+ public int getMessageSize()
+ {
+ return _messageSize;
+ }
+
+ public void setMessageSize(final int messageSize)
+ {
+ this._messageSize = messageSize;
+ }
+
+ public int getPriority()
+ {
+ return _priority;
+ }
+
+ public void setPriority(final int priority)
+ {
+ this._priority = priority;
+ }
+
+ public int getDeliveryMode()
+ {
+ return _deliveryMode;
+ }
+
+ public void setDeliveryMode(final int deliveryMode)
+ {
+ this._deliveryMode = deliveryMode;
+ }
+
+ public long getTimeToLive()
+ {
+ return _timeToLive;
+ }
+
+ public void setTimeToLive(final long timeToLive)
+ {
+ this._timeToLive = timeToLive;
+ }
+
+ public long getInterval()
+ {
+ return _interval;
+ }
+
+ public void setInterval(long interval)
+ {
+ this._interval = interval;
+ }
+
+ public long getStartDelay()
+ {
+ return _startDelay;
+ }
+
+ public void setStartDelay(long startDelay)
+ {
+ this._startDelay = startDelay;
+ }
+
+ public String getMessageProviderName()
+ {
+ return _messageProviderName;
+ }
+
+ public void setMessageProviderName(String messageProviderName)
+ {
+ this._messageProviderName = messageProviderName;
+ }
+}
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/message/CreateResponderCommand.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/message/CreateResponderCommand.java
new file mode 100644
index 0000000000..85a2b5e548
--- /dev/null
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/message/CreateResponderCommand.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.disttest.message;
+
+public class CreateResponderCommand extends Command
+{
+ public CreateResponderCommand()
+ {
+ super(CommandType.CREATE_RESPONDER);
+ }
+}
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/message/CreateSessionCommand.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/message/CreateSessionCommand.java
new file mode 100644
index 0000000000..f6f59c26af
--- /dev/null
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/message/CreateSessionCommand.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.disttest.message;
+
+public class CreateSessionCommand extends Command
+{
+ private String sessionName;
+ private String connectionName;
+ private int acknowledgeMode;
+
+ public CreateSessionCommand()
+ {
+ super(CommandType.CREATE_SESSION);
+ }
+
+ public String getSessionName()
+ {
+ return sessionName;
+ }
+
+ public void setSessionName(final String sessionName)
+ {
+ this.sessionName = sessionName;
+ }
+
+ public String getConnectionName()
+ {
+ return connectionName;
+ }
+
+ public void setConnectionName(final String connectionName)
+ {
+ this.connectionName = connectionName;
+ }
+
+ public int getAcknowledgeMode()
+ {
+ return acknowledgeMode;
+ }
+
+ public void setAcknowledgeMode(final int acknowledgeMode)
+ {
+ this.acknowledgeMode = acknowledgeMode;
+ }
+}
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/message/NoOpCommand.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/message/NoOpCommand.java
new file mode 100644
index 0000000000..1cdaf00163
--- /dev/null
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/message/NoOpCommand.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.disttest.message;
+
+
+public class NoOpCommand extends Command
+{
+ public NoOpCommand()
+ {
+ super(CommandType.NO_OP);
+ }
+
+}
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/message/OutputAttribute.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/message/OutputAttribute.java
new file mode 100644
index 0000000000..b912eaa1cb
--- /dev/null
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/message/OutputAttribute.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.disttest.message;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+
+/**
+ * Marks an attribute of {@link ParticipantResult} that should be written to the test output file.
+ */
+@Target(ElementType.METHOD)
+@Retention(RetentionPolicy.RUNTIME)
+public @interface OutputAttribute
+{
+ ParticipantAttribute attribute();
+}
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/message/ParticipantAttribute.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/message/ParticipantAttribute.java
new file mode 100644
index 0000000000..c2fb38cc96
--- /dev/null
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/message/ParticipantAttribute.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.disttest.message;
+
+import org.apache.qpid.disttest.client.Participant;
+
+/**
+ * Meta-date representing the attributes of {@link Participant} that we write to the test output file.
+ *
+ * Order of declaration is currently important - it determines they order they appear in the output.
+ *
+ * @see OutputAttribute
+ */
+public enum ParticipantAttribute
+{
+ TEST_NAME("Test Name"),
+ ITERATION_NUMBER("Iteration number"),
+ CONFIGURED_CLIENT_NAME("Client Name"),
+ PARTICIPANT_NAME("Participant name"),
+ NUMBER_OF_MESSAGES_PROCESSED("Number of messages"),
+ PAYLOAD_SIZE("Payload size (bytes)"),
+ PRIORITY("Priority"),
+ TIME_TO_LIVE("Time to live (ms)"),
+ DELIVERY_MODE("Delivery mode"),
+ BATCH_SIZE("Batch size"),
+ MAXIMUM_DURATION("Maximum duration (ms)"),
+ PRODUCER_START_DELAY("Producer start delay (ms)"),
+ PRODUCER_INTERVAL("Producer interval (ms)"),
+ IS_TOPIC("Is topic"),
+ IS_DURABLE_SUBSCRIPTION("Is durable subscription"),
+ IS_BROWSIING_SUBSCRIPTION("Is browsing subscription"),
+ IS_SELECTOR("Is selector"),
+ IS_NO_LOCAL("Is no local"),
+ IS_SYNCHRONOUS_CONSUMER("Is synchronous consumer"),
+ TOTAL_PAYLOAD_PROCESSED("Total payload processed (bytes)"),
+ THROUGHPUT("Throughput (kbytes/s)"),
+ TIME_TAKEN("Time taken (ms)"),
+ ERROR_MESSAGE("Error message");
+
+ private String _displayName;
+
+ ParticipantAttribute(String displayName)
+ {
+ _displayName = displayName;
+ }
+
+ public String getDisplayName()
+ {
+ return _displayName;
+ }
+}
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/message/ParticipantAttributeExtractor.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/message/ParticipantAttributeExtractor.java
new file mode 100644
index 0000000000..95a19ceefc
--- /dev/null
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/message/ParticipantAttributeExtractor.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.disttest.message;
+
+import java.beans.PropertyDescriptor;
+import java.lang.annotation.Annotation;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.beanutils.PropertyUtils;
+
+
+public class ParticipantAttributeExtractor
+{
+ public static Map<ParticipantAttribute, Object> getAttributes(Object targetObject)
+ {
+ Map<ParticipantAttribute, Object> attributes = new HashMap<ParticipantAttribute, Object>();
+
+
+ PropertyDescriptor[] descriptors = PropertyUtils.getPropertyDescriptors(targetObject);
+ for (PropertyDescriptor propertyDescriptor : descriptors)
+ {
+ final Method readMethod = getPropertyReadMethod(targetObject, propertyDescriptor);
+
+ for (Annotation annotation : readMethod.getDeclaredAnnotations())
+ {
+ if (annotation instanceof OutputAttribute)
+ {
+ OutputAttribute outputAttribute = (OutputAttribute) annotation;
+
+ Object value = getPropertyValue(targetObject, propertyDescriptor.getName());
+ attributes.put(outputAttribute.attribute(), value);
+ }
+ }
+ }
+
+ return attributes;
+ }
+
+ public static Method getPropertyReadMethod(Object targetObject, PropertyDescriptor propertyDescriptor)
+ {
+ final Method readMethod = propertyDescriptor.getReadMethod();
+
+ if (readMethod == null)
+ {
+ throw new RuntimeException("No read method for property " + propertyDescriptor.getName() + " on " + targetObject);
+ }
+ return readMethod;
+ }
+
+ public static Object getPropertyValue(Object targetObject, String propertyName)
+ {
+ try
+ {
+ return PropertyUtils.getProperty(targetObject, propertyName);
+ }
+ catch (IllegalAccessException e)
+ {
+ throw new RuntimeException("Couldn't get value of property " + propertyName + " from " + targetObject, e);
+ }
+ catch (InvocationTargetException e)
+ {
+ throw new RuntimeException("Couldn't get value of property " + propertyName + " from " + targetObject, e);
+ }
+ catch (NoSuchMethodException e)
+ {
+ throw new RuntimeException("Couldn't get value of property " + propertyName + " from " + targetObject, e);
+ }
+
+ }
+}
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/message/ParticipantResult.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/message/ParticipantResult.java
new file mode 100644
index 0000000000..4550f10c65
--- /dev/null
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/message/ParticipantResult.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.disttest.message;
+
+import static org.apache.qpid.disttest.message.ParticipantAttribute.BATCH_SIZE;
+import static org.apache.qpid.disttest.message.ParticipantAttribute.CONFIGURED_CLIENT_NAME;
+import static org.apache.qpid.disttest.message.ParticipantAttribute.ITERATION_NUMBER;
+import static org.apache.qpid.disttest.message.ParticipantAttribute.MAXIMUM_DURATION;
+import static org.apache.qpid.disttest.message.ParticipantAttribute.PAYLOAD_SIZE;
+import static org.apache.qpid.disttest.message.ParticipantAttribute.NUMBER_OF_MESSAGES_PROCESSED;
+import static org.apache.qpid.disttest.message.ParticipantAttribute.THROUGHPUT;
+import static org.apache.qpid.disttest.message.ParticipantAttribute.PARTICIPANT_NAME;
+import static org.apache.qpid.disttest.message.ParticipantAttribute.TEST_NAME;
+
+import java.util.Comparator;
+import java.util.Date;
+import java.util.Map;
+
+public class ParticipantResult extends Response
+{
+ private String _testName;
+ private String _participantName;
+ private long _startInMillis;
+ private long _endInMillis;
+ private int _batchSize;
+ private long _maximumDuration;
+ private int _iterationNumber;
+
+ private String _configuredClientName;
+
+ private long _numberOfMessagesProcessed;
+ private long _totalPayloadProcessed;
+ private int _payloadSize;
+ private double _throughput;
+
+ public static final Comparator<? super ParticipantResult> PARTICIPANT_NAME_COMPARATOR = new Comparator<ParticipantResult>()
+ {
+ @Override
+ public int compare(ParticipantResult participantResult1, ParticipantResult participantResult2)
+ {
+ return participantResult1.getParticipantName().compareTo(participantResult2.getParticipantName());
+ }
+ };
+
+ public ParticipantResult()
+ {
+ this(CommandType.PARTICIPANT_RESULT);
+ }
+
+ public ParticipantResult(CommandType commandType)
+ {
+ super(commandType);
+ }
+
+ public ParticipantResult(String participantName)
+ {
+ this();
+ setParticipantName(participantName);
+ }
+
+ @OutputAttribute(attribute=TEST_NAME)
+ public String getTestName()
+ {
+ return _testName;
+ }
+
+ public void setTestName(String testName)
+ {
+ _testName = testName;
+ }
+
+ @OutputAttribute(attribute=ITERATION_NUMBER)
+ public int getIterationNumber()
+ {
+ return _iterationNumber;
+ }
+
+ public void setIterationNumber(int iterationNumber)
+ {
+ _iterationNumber = iterationNumber;
+ }
+
+ public void setStartDate(Date start)
+ {
+ _startInMillis = start.getTime();
+ }
+
+ public void setEndDate(Date end)
+ {
+ _endInMillis = end.getTime();
+ }
+
+ public Date getStartDate()
+ {
+ return new Date(_startInMillis);
+ }
+
+ public Date getEndDate()
+ {
+ return new Date(_endInMillis);
+ }
+
+
+ public long getStartInMillis()
+ {
+ return _startInMillis;
+ }
+
+ public long getEndInMillis()
+ {
+ return _endInMillis;
+ }
+
+
+ @OutputAttribute(attribute=PARTICIPANT_NAME)
+ public String getParticipantName()
+ {
+ return _participantName;
+ }
+
+
+ public void setParticipantName(String participantName)
+ {
+ _participantName = participantName;
+ }
+
+ @OutputAttribute(attribute=ParticipantAttribute.TIME_TAKEN)
+ public long getTimeTaken()
+ {
+ return _endInMillis - _startInMillis;
+ }
+
+ @OutputAttribute(attribute=CONFIGURED_CLIENT_NAME)
+ public String getConfiguredClientName()
+ {
+ return _configuredClientName;
+ }
+
+ public void setConfiguredClientName(String configuredClientName)
+ {
+ _configuredClientName = configuredClientName;
+ }
+
+ @OutputAttribute(attribute=NUMBER_OF_MESSAGES_PROCESSED)
+ public long getNumberOfMessagesProcessed()
+ {
+ return _numberOfMessagesProcessed;
+ }
+
+ public void setNumberOfMessagesProcessed(long numberOfMessagesProcessed)
+ {
+ _numberOfMessagesProcessed = numberOfMessagesProcessed;
+ }
+
+ @OutputAttribute(attribute=ParticipantAttribute.TOTAL_PAYLOAD_PROCESSED)
+ public long getTotalPayloadProcessed()
+ {
+ return _totalPayloadProcessed;
+ }
+
+ @OutputAttribute(attribute = PAYLOAD_SIZE)
+ public int getPayloadSize()
+ {
+ return _payloadSize;
+ }
+
+ public void setPayloadSize(int payloadSize)
+ {
+ _payloadSize = payloadSize;
+ }
+
+ public void setTotalPayloadProcessed(long totalPayloadProcessed)
+ {
+ _totalPayloadProcessed = totalPayloadProcessed;
+ }
+
+ public Map<ParticipantAttribute, Object> getAttributes()
+ {
+ return ParticipantAttributeExtractor.getAttributes(this);
+ }
+
+ public void setBatchSize(int batchSize)
+ {
+ _batchSize = batchSize;
+ }
+
+ @OutputAttribute(attribute=BATCH_SIZE)
+ public int getBatchSize()
+ {
+ return _batchSize;
+ }
+
+ public void setMaximumDuration(long maximumDuration)
+ {
+ _maximumDuration = maximumDuration;
+ }
+
+ @OutputAttribute(attribute=MAXIMUM_DURATION)
+ public long getMaximumDuration()
+ {
+ return _maximumDuration;
+ }
+
+ @OutputAttribute(attribute=THROUGHPUT)
+ public double getThroughput()
+ {
+ return _throughput;
+ }
+
+ public void setThroughput(double throughput)
+ {
+ _throughput = throughput;
+ }
+
+}
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/message/ProducerParticipantResult.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/message/ProducerParticipantResult.java
new file mode 100644
index 0000000000..766c90eec8
--- /dev/null
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/message/ProducerParticipantResult.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.disttest.message;
+
+import static org.apache.qpid.disttest.message.ParticipantAttribute.DELIVERY_MODE;
+import static org.apache.qpid.disttest.message.ParticipantAttribute.PRIORITY;
+import static org.apache.qpid.disttest.message.ParticipantAttribute.PRODUCER_INTERVAL;
+import static org.apache.qpid.disttest.message.ParticipantAttribute.PRODUCER_START_DELAY;
+import static org.apache.qpid.disttest.message.ParticipantAttribute.TIME_TO_LIVE;
+
+public class ProducerParticipantResult extends ParticipantResult
+{
+ private int _priority;
+ private long _timeToLive;
+ private long _startDelay;
+ private long _interval;
+ private int _deliveryMode;
+ public ProducerParticipantResult()
+ {
+ super(CommandType.PRODUCER_PARTICIPANT_RESULT);
+ }
+
+ public ProducerParticipantResult(String participantName)
+ {
+ this();
+ setParticipantName(participantName);
+ }
+
+ @OutputAttribute(attribute=PRIORITY)
+ public int getPriority()
+ {
+ return _priority;
+ }
+
+ public void setPriority(int priority)
+ {
+ _priority = priority;
+ }
+
+ @OutputAttribute(attribute=TIME_TO_LIVE)
+ public long getTimeToLive()
+ {
+ return _timeToLive;
+ }
+
+ public void setTimeToLive(long timeToLive)
+ {
+ _timeToLive = timeToLive;
+ }
+
+ @OutputAttribute(attribute=PRODUCER_START_DELAY)
+ public long getStartDelay()
+ {
+ return _startDelay;
+ }
+
+ public void setStartDelay(long startDelay)
+ {
+ _startDelay = startDelay;
+ }
+
+ @OutputAttribute(attribute=PRODUCER_INTERVAL)
+ public long getInterval()
+ {
+ return _interval;
+ }
+
+ public void setInterval(long producerInterval)
+ {
+ _interval = producerInterval;
+ }
+
+ @OutputAttribute(attribute=DELIVERY_MODE)
+ public int getDeliveryMode()
+ {
+ return _deliveryMode;
+ }
+
+ public void setDeliveryMode(int deliveryMode)
+ {
+ this._deliveryMode = deliveryMode;
+ }
+
+}
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/message/RegisterClientCommand.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/message/RegisterClientCommand.java
new file mode 100644
index 0000000000..af880a37d9
--- /dev/null
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/message/RegisterClientCommand.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.disttest.message;
+
+public class RegisterClientCommand extends Command
+{
+ private final String _clientName;
+ private final String _clientQueueName;
+
+ public RegisterClientCommand(final String clientName, final String clientQueueName)
+ {
+ super(CommandType.REGISTER_CLIENT);
+ _clientName = clientName;
+ _clientQueueName = clientQueueName;
+ }
+
+ public String getClientName()
+ {
+ return _clientName;
+ }
+
+ public String getClientQueueName()
+ {
+ return _clientQueueName;
+ }
+}
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/message/Response.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/message/Response.java
new file mode 100644
index 0000000000..aac056efcb
--- /dev/null
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/message/Response.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.disttest.message;
+
+
+public class Response extends Command
+{
+ protected String _registeredClientName;
+ protected String _errorMessage;
+ private CommandType _inReplyToCommandType;
+
+ public Response(final String registeredclientName, final CommandType inReplyToCommandType, final String errorMessage)
+ {
+ super(CommandType.RESPONSE);
+ _registeredClientName = registeredclientName;
+ _errorMessage = errorMessage;
+ _inReplyToCommandType = inReplyToCommandType;
+ }
+
+ public Response(String clientName, CommandType inReplyToCommandType)
+ {
+ this(clientName, inReplyToCommandType, null);
+ }
+
+ /**
+ * Provided so that subclasses can call super(commandType)
+ */
+ protected Response(CommandType commandType)
+ {
+ super(commandType);
+ }
+
+ public String getRegisteredClientName()
+ {
+ return _registeredClientName;
+ }
+
+ public void setRegisteredClientName(String registeredClientName)
+ {
+ _registeredClientName = registeredClientName;
+ }
+
+ @OutputAttribute(attribute=ParticipantAttribute.ERROR_MESSAGE)
+ public String getErrorMessage()
+ {
+ return _errorMessage;
+ }
+
+ public void setErrorMessage(String errorMessage)
+ {
+ _errorMessage = errorMessage;
+ }
+
+ public boolean hasError()
+ {
+ return _errorMessage != null;
+ }
+
+ public CommandType getInReplyToCommandType()
+ {
+ return _inReplyToCommandType;
+ }
+}
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/message/StartTestCommand.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/message/StartTestCommand.java
new file mode 100644
index 0000000000..4a53697ecd
--- /dev/null
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/message/StartTestCommand.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.disttest.message;
+
+public class StartTestCommand extends Command
+{
+
+ public StartTestCommand()
+ {
+ super(CommandType.START_TEST);
+ }
+
+} \ No newline at end of file
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/message/StopClientCommand.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/message/StopClientCommand.java
new file mode 100644
index 0000000000..08758aaa69
--- /dev/null
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/message/StopClientCommand.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.disttest.message;
+
+public class StopClientCommand extends Command
+{
+ public StopClientCommand()
+ {
+ super(CommandType.STOP_CLIENT);
+ }
+}
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/message/TearDownTestCommand.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/message/TearDownTestCommand.java
new file mode 100644
index 0000000000..6b1367d4f9
--- /dev/null
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/message/TearDownTestCommand.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.disttest.message;
+
+public class TearDownTestCommand extends Command
+{
+
+ public TearDownTestCommand()
+ {
+ super(CommandType.TEAR_DOWN_TEST);
+ }
+
+}
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/results/aggregation/AggregatedTestResult.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/results/aggregation/AggregatedTestResult.java
new file mode 100644
index 0000000000..5e6da2e65b
--- /dev/null
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/results/aggregation/AggregatedTestResult.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.disttest.results.aggregation;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.qpid.disttest.message.ParticipantResult;
+
+public class AggregatedTestResult implements ITestResult
+{
+ private ParticipantResult _allParticipantResult;
+ private ParticipantResult _allConsumerParticipantResult;
+ private ParticipantResult _allProducerParticipantResult;
+ private final ITestResult _originalTestResult;
+
+ public AggregatedTestResult(ITestResult originalTestResult)
+ {
+ _originalTestResult = originalTestResult;
+ }
+
+ /**
+ * Returns the result where {@link ParticipantResult#getNumberOfMessagesProcessed()}
+ * is the total number of messages consumed during the test, and {@link ParticipantResult#getTimeTaken()}
+ * is the time between the start of the first producer and the end of the last consumer to finish.
+ */
+ public ParticipantResult getAllParticipantResult()
+ {
+ return _allParticipantResult;
+ }
+
+ public void setAllParticipantResult(ParticipantResult allParticipantResult)
+ {
+ _allParticipantResult = allParticipantResult;
+ }
+
+ public ParticipantResult getAllConsumerParticipantResult()
+ {
+ return _allConsumerParticipantResult;
+ }
+ public void setAllConsumerParticipantResult(ParticipantResult allConsumerParticipantResult)
+ {
+ _allConsumerParticipantResult = allConsumerParticipantResult;
+ }
+ public ParticipantResult getAllProducerParticipantResult()
+ {
+ return _allProducerParticipantResult;
+ }
+ public void setAllProducerParticipantResult(ParticipantResult allProducerParticipantResult)
+ {
+ _allProducerParticipantResult = allProducerParticipantResult;
+ }
+
+ // TODO should weaken to Collection
+ @Override
+ public List<ParticipantResult> getParticipantResults()
+ {
+ List<ParticipantResult> allParticipantResults = new ArrayList<ParticipantResult>(_originalTestResult.getParticipantResults());
+
+ allParticipantResults.add(_allConsumerParticipantResult);
+ allParticipantResults.add(_allProducerParticipantResult);
+ allParticipantResults.add(_allParticipantResult);
+
+ return allParticipantResults;
+ }
+
+ @Override
+ public boolean hasErrors()
+ {
+ return _originalTestResult.hasErrors();
+ }
+
+ @Override
+ public String getName()
+ {
+ return _originalTestResult.getName();
+ }
+
+
+
+}
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/results/aggregation/Aggregator.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/results/aggregation/Aggregator.java
new file mode 100644
index 0000000000..cde30d36e5
--- /dev/null
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/results/aggregation/Aggregator.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.disttest.results.aggregation;
+
+import org.apache.qpid.disttest.controller.ResultsForAllTests;
+
+public class Aggregator
+{
+
+ private TestResultAggregator _testResultAggregator = new TestResultAggregator();
+
+ public ResultsForAllTests aggregateResults(ResultsForAllTests rawResultsForAllTests)
+ {
+
+ ResultsForAllTests aggregatedResultsForAllTests = new ResultsForAllTests();
+
+
+ for (ITestResult testResult : rawResultsForAllTests.getTestResults())
+ {
+ AggregatedTestResult aggregateTestResult = _testResultAggregator.aggregateTestResult(testResult);
+ aggregatedResultsForAllTests.add(aggregateTestResult);
+ }
+
+
+ return aggregatedResultsForAllTests;
+ }
+
+ void setTestResultAggregator(TestResultAggregator testResultAggregator)
+ {
+ _testResultAggregator = testResultAggregator;
+ }
+
+}
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/results/aggregation/ITestResult.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/results/aggregation/ITestResult.java
new file mode 100644
index 0000000000..3f9cdff69d
--- /dev/null
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/results/aggregation/ITestResult.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.disttest.results.aggregation;
+
+import java.util.List;
+
+import org.apache.qpid.disttest.message.ParticipantResult;
+
+// TODO rename me!!
+public interface ITestResult
+{
+
+ // TODO should weaken to Collection
+ List<ParticipantResult> getParticipantResults();
+
+ boolean hasErrors();
+
+ String getName();
+
+} \ No newline at end of file
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/results/aggregation/ParticipantResultAggregator.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/results/aggregation/ParticipantResultAggregator.java
new file mode 100644
index 0000000000..6c4e4f87ac
--- /dev/null
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/results/aggregation/ParticipantResultAggregator.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.disttest.results.aggregation;
+
+import java.util.Date;
+import java.util.NavigableSet;
+import java.util.TreeSet;
+
+import org.apache.qpid.disttest.message.ParticipantResult;
+
+public class ParticipantResultAggregator
+{
+ private final String _aggregatedResultName;
+ private final Class<? extends ParticipantResult> _targetClass;
+
+ private long _minStartDate = Long.MAX_VALUE;
+ private long _maxEndDate = 0;
+ private long _numberOfMessagesProcessed = 0;
+ private long _totalPayloadProcessed = 0;
+
+ private NavigableSet<Integer> _encounteredPayloadSizes = new TreeSet<Integer>();
+ private NavigableSet<Integer> _encounteredIterationNumbers = new TreeSet<Integer>();
+ private NavigableSet<String> _encountedTestNames = new TreeSet<String>();
+
+ public ParticipantResultAggregator(Class<? extends ParticipantResult> taregtClass, String aggregateResultName)
+ {
+ _aggregatedResultName = aggregateResultName;
+ _targetClass = taregtClass;
+ }
+
+ public void aggregate(ParticipantResult result)
+ {
+ if (isAggregatable(result))
+ {
+ rollupConstantAttributes(result);
+ computeVariableAttributes(result);
+ }
+ }
+
+ public ParticipantResult getAggregatedResult()
+ {
+ ParticipantResult aggregatedResult = new ParticipantResult(_aggregatedResultName);
+
+ setRolledUpConstantAttributes(aggregatedResult);
+ setComputedVariableAttributes(aggregatedResult);
+
+ return aggregatedResult;
+ }
+
+ private boolean isAggregatable(ParticipantResult result)
+ {
+ return _targetClass.isAssignableFrom(result.getClass());
+ }
+
+ private void computeVariableAttributes(ParticipantResult result)
+ {
+ _numberOfMessagesProcessed += result.getNumberOfMessagesProcessed();
+ _totalPayloadProcessed += result.getTotalPayloadProcessed();
+ _minStartDate = Math.min(_minStartDate, result.getStartInMillis());
+ _maxEndDate = Math.max(_maxEndDate, result.getEndInMillis());
+ }
+
+ private void rollupConstantAttributes(ParticipantResult result)
+ {
+ if (result.getTestName() != null)
+ {
+ _encountedTestNames.add(result.getTestName());
+ }
+ _encounteredPayloadSizes.add(result.getPayloadSize());
+ _encounteredIterationNumbers.add(result.getIterationNumber());
+ }
+
+ private void setComputedVariableAttributes(ParticipantResult aggregatedResult)
+ {
+ aggregatedResult.setNumberOfMessagesProcessed(_numberOfMessagesProcessed);
+ aggregatedResult.setTotalPayloadProcessed(_totalPayloadProcessed);
+ aggregatedResult.setStartDate(new Date(_minStartDate));
+ aggregatedResult.setEndDate(new Date(_maxEndDate));
+ aggregatedResult.setThroughput(calculateThroughputInKiloBytesPerSecond());
+ }
+
+ private void setRolledUpConstantAttributes(ParticipantResult aggregatedResult)
+ {
+ if (_encounteredIterationNumbers.size() == 1)
+ {
+ aggregatedResult.setIterationNumber( _encounteredIterationNumbers.first());
+ }
+ if (_encounteredPayloadSizes.size() == 1)
+ {
+ aggregatedResult.setPayloadSize(_encounteredPayloadSizes.first());
+ }
+ if (_encountedTestNames.size() == 1)
+ {
+ aggregatedResult.setTestName(_encountedTestNames.first());
+ }
+ }
+
+ private double calculateThroughputInKiloBytesPerSecond()
+ {
+ double durationInMillis = _maxEndDate - _minStartDate;
+ double durationInSeconds = durationInMillis / 1000;
+ double totalPayloadProcessedInKiloBytes = ((double)_totalPayloadProcessed) / 1024;
+
+ return totalPayloadProcessedInKiloBytes/durationInSeconds;
+ }
+
+}
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/results/aggregation/TestResultAggregator.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/results/aggregation/TestResultAggregator.java
new file mode 100644
index 0000000000..1938add560
--- /dev/null
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/results/aggregation/TestResultAggregator.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.disttest.results.aggregation;
+
+import org.apache.qpid.disttest.message.ConsumerParticipantResult;
+import org.apache.qpid.disttest.message.ParticipantResult;
+import org.apache.qpid.disttest.message.ProducerParticipantResult;
+
+public class TestResultAggregator
+{
+ static final String AGGREGATED_ERROR_MESSAGE = "One or more participants reported errors.";
+ public static final String ALL_PARTICIPANTS_NAME = "All";
+ public static final String ALL_PRODUCER_PARTICIPANTS_NAME = "All Producers";
+ public static final String ALL_CONSUMER_PARTICIPANTS_NAME = "All Consumers";
+
+ public AggregatedTestResult aggregateTestResult(ITestResult originalTestResult)
+ {
+ AggregatedTestResult newTestResult = new AggregatedTestResult(originalTestResult);
+
+ ParticipantResultAggregator consumerParticipantResultAggregator = new ParticipantResultAggregator(ConsumerParticipantResult.class,
+ ALL_CONSUMER_PARTICIPANTS_NAME);
+ ParticipantResultAggregator producerParticipantResultAggregator = new ParticipantResultAggregator(ProducerParticipantResult.class,
+ ALL_PRODUCER_PARTICIPANTS_NAME);
+
+ boolean hasError = aggregate(originalTestResult,
+ consumerParticipantResultAggregator,
+ producerParticipantResultAggregator);
+
+ ParticipantResult aggregatedProducerResult = producerParticipantResultAggregator.getAggregatedResult();
+ newTestResult.setAllProducerParticipantResult(aggregatedProducerResult);
+
+ ParticipantResult aggregaredConsumerResult = consumerParticipantResultAggregator.getAggregatedResult();
+ newTestResult.setAllConsumerParticipantResult(aggregaredConsumerResult);
+
+ ParticipantResult aggregatedAllResult = buildAllResultFromOtherAggregatedResults(
+ aggregatedProducerResult, aggregaredConsumerResult);
+
+ if (hasError)
+ {
+ aggregatedAllResult.setErrorMessage(TestResultAggregator.AGGREGATED_ERROR_MESSAGE);
+ }
+ newTestResult.setAllParticipantResult(aggregatedAllResult);
+
+ return newTestResult;
+ }
+
+ private boolean aggregate(ITestResult originalTestResult,
+ ParticipantResultAggregator consumerParticipantResultAggregator,
+ ParticipantResultAggregator producerParticipantResultAggregator)
+ {
+ boolean hasError = false;
+ for (ParticipantResult result : originalTestResult.getParticipantResults())
+ {
+ consumerParticipantResultAggregator.aggregate(result);
+ producerParticipantResultAggregator.aggregate(result);
+
+ if (result.hasError())
+ {
+ hasError = true;
+ }
+ }
+ return hasError;
+ }
+
+ private ParticipantResult buildAllResultFromOtherAggregatedResults(
+ ParticipantResult aggregatedProducerResult, ParticipantResult aggregaredConsumerResult)
+ {
+ ParticipantResult aggregatedAllResult = new ParticipantResult(ALL_PARTICIPANTS_NAME);
+ aggregatedAllResult.setStartDate(aggregatedProducerResult.getStartDate());
+
+ aggregatedAllResult.setEndDate(aggregaredConsumerResult.getEndDate());
+
+ aggregatedAllResult.setIterationNumber(aggregaredConsumerResult.getIterationNumber());
+ aggregatedAllResult.setTestName(aggregaredConsumerResult.getTestName());
+ aggregatedAllResult.setNumberOfMessagesProcessed(aggregaredConsumerResult.getNumberOfMessagesProcessed());
+ aggregatedAllResult.setPayloadSize(aggregaredConsumerResult.getPayloadSize());
+ aggregatedAllResult.setTotalPayloadProcessed(aggregaredConsumerResult.getTotalPayloadProcessed());
+ aggregatedAllResult.setThroughput(aggregaredConsumerResult.getThroughput());
+
+ return aggregatedAllResult;
+ }
+}
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/results/formatting/CSVFormater.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/results/formatting/CSVFormater.java
new file mode 100644
index 0000000000..52e53ca624
--- /dev/null
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/results/formatting/CSVFormater.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.disttest.results.formatting;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.qpid.disttest.controller.ResultsForAllTests;
+import org.apache.qpid.disttest.message.ParticipantAttribute;
+import org.apache.qpid.disttest.message.ParticipantResult;
+import org.apache.qpid.disttest.results.aggregation.ITestResult;
+
+/**
+ * produces CSV output using the ordered enums in {@link ParticipantAttribute}
+ */
+public class CSVFormater
+{
+ public String format(ResultsForAllTests results)
+ {
+ StringBuilder builder = new StringBuilder();
+
+ builder.append(header());
+
+ List<ITestResult> testResults = results.getTestResults();
+
+ for (ITestResult testResult : testResults)
+ {
+
+ List<ParticipantResult> participantResults = new ArrayList<ParticipantResult>(testResult.getParticipantResults());
+ Collections.sort(participantResults, new CSVOrderParticipantResultComparator());
+
+ for (ParticipantResult participantResult : participantResults)
+ {
+ Map<ParticipantAttribute, Object> attributes = participantResult.getAttributes();
+ builder.append(row(attributes));
+ }
+ }
+
+ return builder.toString();
+ }
+
+ /**
+ * return a row, including a newline character at the end
+ */
+ private String row(Map<ParticipantAttribute, Object> attributeValueMap)
+ {
+ List<Object> attributeValues = new ArrayList<Object>();
+ for (ParticipantAttribute attribute : ParticipantAttribute.values())
+ {
+ attributeValues.add(attributeValueMap.get(attribute));
+ }
+
+ String row = StringUtils.join(attributeValues.toArray(), ",");
+ return row + "\n";
+ }
+
+ /** return the header row, including a newline at the end */
+ private String header()
+ {
+ List<String> displayNames = new ArrayList<String>();
+ for (ParticipantAttribute attribute : ParticipantAttribute.values())
+ {
+ displayNames.add(attribute.getDisplayName());
+ }
+
+ String header = StringUtils.join(displayNames.toArray(), ",");
+ return header + "\n";
+ }
+
+}
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/results/formatting/CSVOrderParticipantResultComparator.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/results/formatting/CSVOrderParticipantResultComparator.java
new file mode 100644
index 0000000000..0e1fbbc3c6
--- /dev/null
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/results/formatting/CSVOrderParticipantResultComparator.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.disttest.results.formatting;
+
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.lang.builder.CompareToBuilder;
+import org.apache.qpid.disttest.message.ConsumerParticipantResult;
+import org.apache.qpid.disttest.message.ParticipantResult;
+import org.apache.qpid.disttest.message.ProducerParticipantResult;
+
+public class CSVOrderParticipantResultComparator implements Comparator<ParticipantResult>
+{
+ // TODO yuk
+ private static final Map<Class<? extends ParticipantResult>, Integer> TYPE_CODES = new HashMap<Class<? extends ParticipantResult>, Integer>();
+ static {
+ TYPE_CODES.put(ProducerParticipantResult.class, 0);
+ TYPE_CODES.put(ConsumerParticipantResult.class, 1);
+ TYPE_CODES.put(ParticipantResult.class, 2);
+ }
+
+ @Override
+ public int compare(ParticipantResult left, ParticipantResult right)
+ {
+ return new CompareToBuilder()
+ .append(getTypeCode(left), getTypeCode(right))
+ .append(left.getParticipantName(), right.getParticipantName())
+ .toComparison();
+ }
+
+
+ private int getTypeCode(ParticipantResult participantResult)
+ {
+ return TYPE_CODES.get(participantResult.getClass());
+ }
+
+}
diff --git a/qpid/java/perftests/src/main/resources/perftests.properties b/qpid/java/perftests/src/main/resources/perftests.properties
new file mode 100644
index 0000000000..d8823f9dc5
--- /dev/null
+++ b/qpid/java/perftests/src/main/resources/perftests.properties
@@ -0,0 +1,20 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+java.naming.factory.initial = org.apache.qpid.jndi.PropertiesFileInitialContextFactory
+
+connectionfactory.connectionfactory = amqp://guest:guest@clientid/test?brokerlist='tcp://localhost:5672'