diff options
| author | Robert Gemmell <robbie@apache.org> | 2012-04-05 16:04:10 +0000 |
|---|---|---|
| committer | Robert Gemmell <robbie@apache.org> | 2012-04-05 16:04:10 +0000 |
| commit | eb58a828676bf1fad33c317f86a909f18cadacbc (patch) | |
| tree | aef0634df4867e761ea65792d5e19eacbc029f49 /qpid/java/perftests/src/main | |
| parent | 347ee6bc0e71ca79d729ccf53c134fbe01289621 (diff) | |
| download | qpid-python-eb58a828676bf1fad33c317f86a909f18cadacbc.tar.gz | |
QPID-3936: initial checkin of new testing framework, initially to be used for performance testing but later to be expanded for use with other testing scenarios.
Applied patch from Philip Harvey <phil@philharveyonline.com>, Oleksandr Rudyy <orudyy@gmail.com>, Andrew MacBean <andymacbean@gmail.com>, Keith Wall <kwall@apache.org>, and myself.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1309918 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/perftests/src/main')
84 files changed, 7829 insertions, 0 deletions
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/AbstractRunner.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/AbstractRunner.java new file mode 100644 index 0000000000..9e865010f8 --- /dev/null +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/AbstractRunner.java @@ -0,0 +1,74 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.disttest; + +import java.io.FileInputStream; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; + +import javax.naming.Context; +import javax.naming.InitialContext; + +public class AbstractRunner +{ + public static final String JNDI_CONFIG_PROP = "jndi-config"; + public static final String JNDI_CONFIG_DEFAULT = "perftests-jndi.properties"; + + private Map<String,String> _cliOptions = new HashMap<String, String>(); + { + getCliOptions().put(JNDI_CONFIG_PROP, JNDI_CONFIG_DEFAULT); + } + + protected Context getContext() + { + Context context = null; + + try + { + final Properties properties = new Properties(); + properties.load(new FileInputStream(getJndiConfig())); + context = new InitialContext(properties); + } + catch (Exception e) + { + throw new DistributedTestException("Exception while loading JNDI properties", e); + } + + return context; + } + + public void parseArgumentsIntoConfig(String[] args) + { + ArgumentParser argumentParser = new ArgumentParser(); + argumentParser.parseArgumentsIntoConfig(getCliOptions(), args); + } + + protected String getJndiConfig() + { + return getCliOptions().get(AbstractRunner.JNDI_CONFIG_PROP); + } + + protected Map<String,String> getCliOptions() + { + return _cliOptions; + } +}
\ No newline at end of file diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/ArgumentParser.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/ArgumentParser.java new file mode 100644 index 0000000000..8c1f8675e3 --- /dev/null +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/ArgumentParser.java @@ -0,0 +1,44 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.disttest; + +import java.util.Map; + +public class ArgumentParser +{ + public void parseArgumentsIntoConfig(Map<String, String> initialValues, String[] args) + { + for(String arg: args) + { + String[] splitArg = arg.split("="); + if(splitArg.length != 2) + { + throw new IllegalArgumentException("arguments must have format <name>=<value>: " + arg); + } + + if(initialValues.put(splitArg[0], splitArg[1]) == null) + { + throw new IllegalArgumentException("not a valid configuration property: " + arg); + } + } + + } +} diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/ClientRunner.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/ClientRunner.java new file mode 100644 index 0000000000..9c2f4a3d95 --- /dev/null +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/ClientRunner.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.disttest; + +import javax.naming.Context; +import javax.naming.NamingException; + +import org.apache.qpid.disttest.client.Client; +import org.apache.qpid.disttest.jms.ClientJmsDelegate; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ClientRunner extends AbstractRunner +{ + private static final Logger LOGGER = LoggerFactory.getLogger(ClientRunner.class); + + public static final String NUM_OF_CLIENTS_PROP = "number-of-clients"; + + public static final String NUM_OF_CLIENTS_DEFAULT = "1"; + + public ClientRunner() + { + getCliOptions().put(NUM_OF_CLIENTS_PROP, NUM_OF_CLIENTS_DEFAULT); + } + + public static void main(String[] args) + { + ClientRunner runner = new ClientRunner(); + runner.parseArgumentsIntoConfig(args); + runner.runClients(); + } + + public void setJndiPropertiesFileLocation(String jndiConfigFileLocation) + { + getCliOptions().put(JNDI_CONFIG_PROP, jndiConfigFileLocation); + } + + /** + * Run the clients in the background + */ + public void runClients() + { + int numClients = Integer.parseInt(getCliOptions().get(NUM_OF_CLIENTS_PROP)); + + Context context = getContext(); + + for(int i = 1; i <= numClients; i++) + { + createBackgroundClient(context); + } + } + + private void createBackgroundClient(Context context) + { + try + { + final Client client = new Client(new ClientJmsDelegate(context)); + + final Thread clientThread = new Thread(new Runnable() + { + @Override + public void run() + { + LOGGER.info("Starting client " + client.getClientName()); + client.start(); + client.waitUntilStopped(); + LOGGER.info("Stopped client " + client.getClientName()); + } + }); + clientThread.start(); + } + catch (NamingException e) + { + throw new DistributedTestException("Exception while creating client instance", e); + } + } +} diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/ControllerRunner.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/ControllerRunner.java new file mode 100644 index 0000000000..07d78790f0 --- /dev/null +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/ControllerRunner.java @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.disttest; + +import java.io.FileNotFoundException; +import java.io.FileWriter; +import java.io.IOException; + +import javax.naming.Context; + +import org.apache.qpid.disttest.controller.Controller; +import org.apache.qpid.disttest.controller.ResultsForAllTests; +import org.apache.qpid.disttest.controller.config.Config; +import org.apache.qpid.disttest.controller.config.ConfigReader; +import org.apache.qpid.disttest.jms.ControllerJmsDelegate; +import org.apache.qpid.disttest.results.aggregation.Aggregator; +import org.apache.qpid.disttest.results.formatting.CSVFormater; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ControllerRunner extends AbstractRunner +{ + private static final Logger LOGGER = LoggerFactory.getLogger(ControllerRunner.class); + + public static final String TEST_CONFIG_PROP = "test-config"; + public static final String DISTRIBUTED_PROP = "distributed"; + public static final String OUTPUT_FILE_PROP = "outputfile"; + + private static final String TEST_CONFIG_DEFAULT = "perftests-config.json"; + private static final String DISTRIBUTED_DEFAULT = "false"; + private static final String OUTPUT_FILE_DEFAULT = "output.csv"; + + private Controller _controller; + + private final Aggregator _aggregator = new Aggregator(); + + + public ControllerRunner() + { + getCliOptions().put(TEST_CONFIG_PROP, TEST_CONFIG_DEFAULT); + getCliOptions().put(DISTRIBUTED_PROP, DISTRIBUTED_DEFAULT); + getCliOptions().put(OUTPUT_FILE_PROP, OUTPUT_FILE_DEFAULT); + } + + public static void main(String[] args) throws Exception + { + ControllerRunner runner = new ControllerRunner(); + runner.parseArgumentsIntoConfig(args); + runner.runController(); + } + + public void runController() throws Exception + { + Context context = getContext(); + + ControllerJmsDelegate jmsDelegate = new ControllerJmsDelegate(context); + + try + { + runTests(jmsDelegate); + } + finally + { + jmsDelegate.closeConnections(); + } + } + + private void runTests(ControllerJmsDelegate jmsDelegate) + { + + _controller = new Controller(jmsDelegate, + DistributedTestConstants.REGISTRATION_TIMEOUT, DistributedTestConstants.COMMAND_RESPONSE_TIMEOUT); + + Config testConfig = getTestConfig(); + _controller.setConfig(testConfig); + + if(!isDistributed()) + { + //we must create the required test clients, running in single-jvm mode + int numClients = testConfig.getTotalNumberOfClients(); + for (int i = 1; i <= numClients; i++) + { + ClientRunner clientRunner = new ClientRunner(); + clientRunner.setJndiPropertiesFileLocation(getJndiConfig()); + clientRunner.runClients(); + } + } + + ResultsForAllTests resultsForAllTests = null; + try + { + _controller.awaitClientRegistrations(); + + ResultsForAllTests rawResultsForAllTests = _controller.runAllTests(); + resultsForAllTests = _aggregator.aggregateResults(rawResultsForAllTests); + } + finally + { + _controller.stopAllRegisteredClients(); + } + + final String outputFile = getOutputFile(); + writeResultsToFile(resultsForAllTests, outputFile); + } + + private void writeResultsToFile(ResultsForAllTests resultsForAllTests, String outputFile) + { + FileWriter writer = null; + try + { + final String outputCsv = new CSVFormater().format(resultsForAllTests); + writer = new FileWriter(outputFile); + writer.write(outputCsv); + LOGGER.info("Wrote " + resultsForAllTests.getTestResults().size() + " test result(s) to output file " + outputFile); + } + catch (IOException e) + { + throw new DistributedTestException("Unable to write output file " + outputFile, e); + } + finally + { + if (writer != null) + { + try + { + writer.close(); + } + catch (IOException e) + { + LOGGER.error("Failed to close stream for file " + outputFile, e); + } + } + } + } + + private String getOutputFile() + { + return String.valueOf(getCliOptions().get(ControllerRunner.OUTPUT_FILE_PROP)); + } + + private Config getTestConfig() + { + ConfigReader configReader = new ConfigReader(); + Config testConfig; + try + { + testConfig = configReader.getConfigFromFile(getCliOptions().get(ControllerRunner.TEST_CONFIG_PROP)); + } + catch (FileNotFoundException e) + { + throw new DistributedTestException("Exception while loading test config", e); + } + return testConfig; + } + + private boolean isDistributed() + { + return Boolean.valueOf(getCliOptions().get(ControllerRunner.DISTRIBUTED_PROP)); + } + + +} diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/DistributedTestConstants.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/DistributedTestConstants.java new file mode 100644 index 0000000000..c4892edca9 --- /dev/null +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/DistributedTestConstants.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.disttest; + +public abstract class DistributedTestConstants +{ + public static final String PERF_TEST_PROPERTIES_FILE = "perftests.properties"; + + public static final String MSG_COMMAND_PROPERTY = "COMMAND"; + public static final String MSG_JSON_PROPERTY = "JSON"; + + public static final long REGISTRATION_TIMEOUT = 60000; + public static final long COMMAND_RESPONSE_TIMEOUT = 10000; + + public static final String CONTROLLER_QUEUE_JNDI_NAME = "controllerqueue"; +} diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/DistributedTestException.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/DistributedTestException.java new file mode 100644 index 0000000000..d1d24dcfff --- /dev/null +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/DistributedTestException.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.disttest; + +import org.apache.qpid.disttest.message.Command; + +public class DistributedTestException extends RuntimeException +{ + private static final long serialVersionUID = 1L; + private final Command causeCommand; + + public DistributedTestException(final String message) + { + this(message, (Command) null); + } + + public DistributedTestException(final Throwable cause) + { + this(cause, null); + } + + public DistributedTestException(final String message, final Throwable cause) + { + this(message, cause, null); + } + + public DistributedTestException(final String message, final Command commandCause) + { + super(message); + causeCommand = commandCause; + } + + public DistributedTestException(final Throwable cause, final Command commandCause) + { + super(cause); + causeCommand = commandCause; + } + + public DistributedTestException(final String message, final Throwable cause, final Command commandCause) + { + super(message, cause); + causeCommand = commandCause; + } + + public Command getCauseCommand() + { + return causeCommand; + } +} diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/Visitor.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/Visitor.java new file mode 100644 index 0000000000..52dad9aa4d --- /dev/null +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/Visitor.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.disttest; + +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; + +/** + * A variation of the visitor pattern that uses reflection to call the correct + * visit method. By convention, subclasses should provide public + * <pre>visit(SpecificClass)</pre> methods. + */ +public abstract class Visitor +{ + + private static final String VISITOR_METHOD_NAME = "visit"; + + public void visit(Object targetObject) + { + Class<? extends Object> targetObjectClass = targetObject.getClass(); + final Method method = findVisitMethodForTargetObjectClass(targetObjectClass); + invokeVisitMethod(targetObject, method); + } + + private Method findVisitMethodForTargetObjectClass( + Class<? extends Object> targetObjectClass) + { + final Method method; + try + { + method = getClass().getDeclaredMethod(VISITOR_METHOD_NAME, targetObjectClass); + } + catch (Exception e) + { + throw new DistributedTestException("Failed to find method " + VISITOR_METHOD_NAME + " on object of class " + targetObjectClass, e); + } + return method; + } + + private void invokeVisitMethod(Object targetObject, final Method method) + { + try + { + method.invoke(this, targetObject); + } + catch (IllegalArgumentException e) + { + throw new DistributedTestException(e); + } + catch (IllegalAccessException e) + { + throw new DistributedTestException(e); + } + catch (InvocationTargetException e) + { + throw new DistributedTestException(e.getCause()); + } + } +} diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/Client.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/Client.java new file mode 100644 index 0000000000..ed29182c51 --- /dev/null +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/Client.java @@ -0,0 +1,204 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.disttest.client; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import javax.naming.NamingException; + +import org.apache.commons.lang.builder.ToStringBuilder; +import org.apache.commons.lang.builder.ToStringStyle; +import org.apache.qpid.disttest.DistributedTestException; +import org.apache.qpid.disttest.Visitor; +import org.apache.qpid.disttest.jms.ClientJmsDelegate; +import org.apache.qpid.disttest.message.Command; +import org.apache.qpid.disttest.message.ParticipantResult; +import org.apache.qpid.disttest.message.Response; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class Client +{ + private static final Logger LOGGER = LoggerFactory.getLogger(Client.class); + + private final ClientJmsDelegate _clientJmsDelegate; + + private final CountDownLatch _latch = new CountDownLatch(1); + private Visitor _visitor; + private final AtomicReference<ClientState> _state; + private ParticipantExecutorRegistry _participantRegistry = new ParticipantExecutorRegistry(); + + public Client(final ClientJmsDelegate delegate) throws NamingException + { + _clientJmsDelegate = delegate; + _state = new AtomicReference<ClientState>(ClientState.CREATED); + _visitor = new ClientCommandVisitor(this, _clientJmsDelegate); + } + + /** + * Register with the controller + */ + public void start() + { + _clientJmsDelegate.setInstructionListener(this); + _clientJmsDelegate.sendRegistrationMessage(); + _state.set(ClientState.READY); + } + + public void stop() + { + _state.set(ClientState.STOPPED); + _latch.countDown(); + } + + public void addParticipantExecutor(final ParticipantExecutor participant) + { + _participantRegistry.add(participant); + } + + public void waitUntilStopped() + { + waitUntilStopped(0); + } + + public void waitUntilStopped(final long timeout) + { + try + { + if (timeout == 0) + { + _latch.await(); + } + else + { + _latch.await(timeout, TimeUnit.MILLISECONDS); + } + } + catch (final InterruptedException ie) + { + Thread.currentThread().interrupt(); + } + + _clientJmsDelegate.destroy(); + } + + public void processInstruction(final Command command) + { + String responseMessage = null; + try + { + command.accept(_visitor); + } + catch (final Exception e) + { + LOGGER.error("Error processing instruction", e); + responseMessage = e.getMessage(); + } + finally + { + _clientJmsDelegate.sendResponseMessage(new Response(_clientJmsDelegate.getClientName(), command.getType(), responseMessage)); + } + } + + public ClientState getState() + { + return _state.get(); + } + + public String getClientName() + { + return _clientJmsDelegate.getClientName(); + } + + public void setClientCommandVisitor(final ClientCommandVisitor visitor) + { + _visitor = visitor; + } + + public void startTest() + { + if (_state.compareAndSet(ClientState.READY, ClientState.RUNNING_TEST)) + { + try + { + _clientJmsDelegate.startConnections(); + for (final ParticipantExecutor executor : _participantRegistry.executors()) + { + executor.start(this); + } + } + catch (final Exception e) + { + try + { + tearDownTest(); + } + catch (final Exception e2) + { + // ignore + } + throw new DistributedTestException("Error starting test: " + _clientJmsDelegate.getClientName(), e); + } + } + else + { + throw new DistributedTestException("Client '" + _clientJmsDelegate.getClientName() + + "' is not in READY state:" + _state.get()); + } + } + + public void tearDownTest() + { + if (_state.compareAndSet(ClientState.RUNNING_TEST, ClientState.READY)) + { + LOGGER.info("Tearing down test on client: " + _clientJmsDelegate.getClientName()); + + _clientJmsDelegate.closeTestConnections(); + } + else + { + throw new DistributedTestException("Client '" + _clientJmsDelegate.getClientName() + "' is not in RUNNING_TEST state! Ignoring tearDownTest"); + } + + + _participantRegistry.clear(); + } + + public void sendResults(ParticipantResult testResult) + { + _clientJmsDelegate.sendResponseMessage(testResult); + LOGGER.info("Sent test results " + testResult); + } + + @Override + public String toString() + { + return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE) + .append("clientJmsDelegate", _clientJmsDelegate).toString(); + } + + void setParticipantRegistry(ParticipantExecutorRegistry participantRegistry) + { + _participantRegistry = participantRegistry; + } + +} diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/ClientCommandVisitor.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/ClientCommandVisitor.java new file mode 100644 index 0000000000..791897323e --- /dev/null +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/ClientCommandVisitor.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.disttest.client; + +import org.apache.qpid.disttest.Visitor; +import org.apache.qpid.disttest.jms.ClientJmsDelegate; +import org.apache.qpid.disttest.message.CreateConnectionCommand; +import org.apache.qpid.disttest.message.CreateConsumerCommand; +import org.apache.qpid.disttest.message.CreateMessageProviderCommand; +import org.apache.qpid.disttest.message.CreateProducerCommand; +import org.apache.qpid.disttest.message.CreateSessionCommand; +import org.apache.qpid.disttest.message.NoOpCommand; +import org.apache.qpid.disttest.message.StartTestCommand; +import org.apache.qpid.disttest.message.StopClientCommand; +import org.apache.qpid.disttest.message.TearDownTestCommand; + +public class ClientCommandVisitor extends Visitor +{ + private final Client _client; + private final ClientJmsDelegate _clientJmsDelegate; + + public ClientCommandVisitor(final Client client, final ClientJmsDelegate clientJmsDelegate) + { + super(); + _client = client; + _clientJmsDelegate = clientJmsDelegate; + } + + public void visit(final StopClientCommand command) + { + _client.stop(); + } + + public void visit(final NoOpCommand command) + { + // no-op + } + + public void visit(final CreateConnectionCommand command) + { + _clientJmsDelegate.createConnection(command); + } + + public void visit(final CreateSessionCommand command) + { + _clientJmsDelegate.createSession(command); + } + + public void visit(final CreateProducerCommand command) + { + + final ProducerParticipant participant = new ProducerParticipant(_clientJmsDelegate, command); + _clientJmsDelegate.createProducer(command); + final ParticipantExecutor executor = new ParticipantExecutor(participant); + _client.addParticipantExecutor(executor); + } + + public void visit(final CreateConsumerCommand command) + { + final ConsumerParticipant participant = new ConsumerParticipant(_clientJmsDelegate, command); + _clientJmsDelegate.createConsumer(command); + final ParticipantExecutor executor = new ParticipantExecutor(participant); + _client.addParticipantExecutor(executor); + } + + public void visit(final StartTestCommand command) + { + _client.startTest(); + } + + public void visit(final TearDownTestCommand command) + { + _client.tearDownTest(); + } + + public void visit(final CreateMessageProviderCommand command) + { + _clientJmsDelegate.createMessageProvider(command); + } + +} diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/ClientState.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/ClientState.java new file mode 100644 index 0000000000..c88c0a6c86 --- /dev/null +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/ClientState.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.disttest.client; + +public enum ClientState +{ + CREATED, READY, STOPPED, RUNNING_TEST; +} diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/ConsumerParticipant.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/ConsumerParticipant.java new file mode 100644 index 0000000000..89400b1f72 --- /dev/null +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/ConsumerParticipant.java @@ -0,0 +1,235 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.disttest.client; + + +import java.util.Date; +import java.util.NavigableSet; +import java.util.concurrent.ConcurrentSkipListSet; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +import javax.jms.Message; +import javax.jms.MessageListener; + +import org.apache.qpid.disttest.DistributedTestException; +import org.apache.qpid.disttest.jms.ClientJmsDelegate; +import org.apache.qpid.disttest.message.ConsumerParticipantResult; +import org.apache.qpid.disttest.message.CreateConsumerCommand; +import org.apache.qpid.disttest.message.ParticipantResult; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ConsumerParticipant implements Participant +{ + private static final Logger LOGGER = LoggerFactory.getLogger(ConsumerParticipant.class); + + private final AtomicInteger _totalNumberOfMessagesReceived = new AtomicInteger(0); + private final NavigableSet<Integer> _allConsumedPayloadSizes = new ConcurrentSkipListSet<Integer>(); + private final AtomicLong _totalPayloadSizeOfAllMessagesReceived = new AtomicLong(0); + private final CountDownLatch _asyncRunHasFinished = new CountDownLatch(1); + private final ClientJmsDelegate _jmsDelegate; + private final CreateConsumerCommand _command; + private final ParticipantResultFactory _resultFactory; + + private long _startTime; + + private volatile Exception _asyncMessageListenerException; + + public ConsumerParticipant(final ClientJmsDelegate delegate, final CreateConsumerCommand command) + { + _jmsDelegate = delegate; + _command = command; + _resultFactory = new ParticipantResultFactory(); + } + + @Override + public ParticipantResult doIt(String registeredClientName) throws Exception + { + final Date start = new Date(); + + if (_command.getMaximumDuration() == 0 && _command.getNumberOfMessages() == 0) + { + throw new DistributedTestException("number of messages and duration cannot both be zero"); + } + + if (_command.isSynchronous()) + { + synchronousRun(); + } + else + { + _jmsDelegate.registerListener(_command.getParticipantName(), new MessageListener(){ + + @Override + public void onMessage(Message message) + { + processAsynchMessage(message); + } + + }); + + waitUntilMsgListenerHasFinished(); + rethrowAnyAsyncMessageListenerException(); + } + + Date end = new Date(); + int numberOfMessagesSent = _totalNumberOfMessagesReceived.get(); + long totalPayloadSize = _totalPayloadSizeOfAllMessagesReceived.get(); + int payloadSize = getPayloadSizeForResultIfConstantOrZeroOtherwise(_allConsumedPayloadSizes); + + ConsumerParticipantResult result = _resultFactory.createForConsumer( + getName(), + registeredClientName, + _command, + numberOfMessagesSent, + payloadSize, + totalPayloadSize, + start, + end); + + return result; + } + + private void synchronousRun() + { + LOGGER.debug("entered synchronousRun: " + this); + + _startTime = System.currentTimeMillis(); + + Message message = null; + + do + { + message = _jmsDelegate.consumeMessage(_command.getParticipantName(), + _command.getReceiveTimeout()); + } while (processMessage(message)); + } + + /** + * @return whether to continue running (ie returns false if the message quota has been reached) + */ + private boolean processMessage(Message message) + { + int messageCount = _totalNumberOfMessagesReceived.incrementAndGet(); + if (LOGGER.isTraceEnabled()) + { + LOGGER.trace("message " + messageCount + " received by " + this); + } + int messagePayloadSize = _jmsDelegate.calculatePayloadSizeFrom(message); + _allConsumedPayloadSizes.add(messagePayloadSize); + _totalPayloadSizeOfAllMessagesReceived.addAndGet(messagePayloadSize); + + boolean batchEnabled = _command.getBatchSize() > 0; + boolean batchComplete = batchEnabled && messageCount % _command.getBatchSize() == 0; + + if (!batchEnabled || batchComplete) + { + _jmsDelegate.commitOrAcknowledgeMessage(message, _command.getSessionName()); + } + + boolean reachedExpectedNumberOfMessages = _command.getNumberOfMessages() > 0 && messageCount >= _command.getNumberOfMessages(); + boolean reachedMaximumDuration = _command.getMaximumDuration() > 0 && System.currentTimeMillis() - _startTime >= _command.getMaximumDuration(); + boolean finishedConsuming = reachedExpectedNumberOfMessages || reachedMaximumDuration; + + if (finishedConsuming) + { + if (LOGGER.isDebugEnabled()) + { + LOGGER.debug("message " + messageCount + + " reachedExpectedNumberOfMessages " + reachedExpectedNumberOfMessages + + " reachedMaximumDuration " + reachedMaximumDuration); + } + + if (batchEnabled && !batchComplete) + { + // commit/acknowledge remaining messages if necessary + _jmsDelegate.commitOrAcknowledgeMessage(message, _command.getSessionName()); + } + return false; + } + + return true; + } + + + /** + * Intended to be called from a {@link MessageListener}. Updates {@link #_asyncRunHasFinished} if + * no more messages should be processed, causing {@link #doIt(String)} to exit. + */ + public void processAsynchMessage(Message message) + { + boolean continueRunning = true; + try + { + if (_startTime == 0) + { + // reset counter and start time on receiving of first message + _startTime = System.currentTimeMillis(); + } + + continueRunning = processMessage(message); + } + catch (Exception e) + { + LOGGER.error("Error occured consuming message " + _totalNumberOfMessagesReceived, e); + continueRunning = false; + _asyncMessageListenerException = e; + } + + if(!continueRunning) + { + _asyncRunHasFinished.countDown(); + } + } + + @Override + public void releaseResources() + { + _jmsDelegate.closeTestConsumer(_command.getParticipantName()); + } + + private int getPayloadSizeForResultIfConstantOrZeroOtherwise(NavigableSet<Integer> allSizes) + { + return allSizes.size() == 1 ? _allConsumedPayloadSizes.first() : 0; + } + + private void rethrowAnyAsyncMessageListenerException() + { + if (_asyncMessageListenerException != null) + { + throw new DistributedTestException(_asyncMessageListenerException); + } + } + + private void waitUntilMsgListenerHasFinished() throws Exception + { + LOGGER.debug("waiting until message listener has finished for " + this); + _asyncRunHasFinished.await(); + LOGGER.debug("Message listener has finished for " + this); + } + + @Override + public String getName() + { + return _command.getParticipantName(); + } +} diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/MessageProvider.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/MessageProvider.java new file mode 100644 index 0000000000..2dcf8940b6 --- /dev/null +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/MessageProvider.java @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.disttest.client; + +import java.lang.reflect.InvocationTargetException; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.Future; +import java.util.concurrent.FutureTask; + +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.Session; +import javax.jms.TextMessage; + +import org.apache.commons.beanutils.BeanUtils; +import org.apache.commons.lang.StringUtils; +import org.apache.qpid.disttest.DistributedTestException; +import org.apache.qpid.disttest.client.property.PropertyValue; +import org.apache.qpid.disttest.message.CreateProducerCommand; + +public class MessageProvider +{ + public static final String TTL = "ttl"; + + public static final String DELIVERY_MODE = "deliveryMode"; + + public static final String PRIORITY = "priority"; + + public static final String[] STANDARD_JMS_PROPERTIES = { "correlationID", DELIVERY_MODE, + "expiration", "messageID", PRIORITY, "redelivered", "replyTo", "timestamp", "type", TTL }; + + private Map<String, PropertyValue> _messageProperties; + private ConcurrentMap<Integer, Future<String>> _payloads; + + public MessageProvider(Map<String, PropertyValue> messageProperties) + { + _messageProperties = messageProperties; + _payloads = new ConcurrentHashMap<Integer, Future<String>>(); + } + + public Message nextMessage(Session session, CreateProducerCommand command) throws JMSException + { + Message message = createTextMessage(session, command); + setMessageProperties(message); + return message; + } + + public boolean isPropertySet(String name) + { + return _messageProperties != null && _messageProperties.containsKey(name); + } + + public void setMessageProperties(Message message) throws JMSException + { + if (_messageProperties != null) + { + for (Entry<String, PropertyValue> entry : _messageProperties.entrySet()) + { + String propertyName = entry.getKey(); + Object propertyValue = entry.getValue().getValue(); + if (isStandardProperty(propertyName)) + { + setStandardProperty(message, propertyName, propertyValue); + } + else + { + setCustomProperty(message, propertyName, propertyValue); + } + } + } + } + + protected void setCustomProperty(Message message, String propertyName, Object propertyValue) throws JMSException + { + if (propertyValue instanceof Integer) + { + message.setIntProperty(propertyName, ((Integer) propertyValue).intValue()); + } + else if (propertyValue instanceof Long) + { + message.setLongProperty(propertyName, ((Long) propertyValue).longValue()); + } + else if (propertyValue instanceof Boolean) + { + message.setBooleanProperty(propertyName, ((Boolean) propertyValue).booleanValue()); + } + else if (propertyValue instanceof Byte) + { + message.setByteProperty(propertyName, ((Byte) propertyValue).byteValue()); + } + else if (propertyValue instanceof Double) + { + message.setDoubleProperty(propertyName, ((Double) propertyValue).doubleValue()); + } + else if (propertyValue instanceof Float) + { + message.setFloatProperty(propertyName, ((Float) propertyValue).floatValue()); + } + else if (propertyValue instanceof Short) + { + message.setShortProperty(propertyName, ((Short) propertyValue).shortValue()); + } + else if (propertyValue instanceof String) + { + message.setStringProperty(propertyName, (String) propertyValue); + } + else + { + message.setObjectProperty(propertyName, propertyValue); + } + } + + protected void setStandardProperty(Message message, String property, Object propertyValue) throws JMSException + { + String propertyName = "JMS" + StringUtils.capitalize(property); + try + { + BeanUtils.setProperty(message, propertyName, propertyValue); + } + catch (IllegalAccessException e) + { + throw new DistributedTestException("Unable to set property " + propertyName + " :" + e.getMessage(), e); + } + catch (InvocationTargetException e) + { + if (e.getCause() instanceof JMSException) + { + throw ((JMSException) e.getCause()); + } + else + { + throw new DistributedTestException("Unable to set property " + propertyName + " :" + e.getMessage(), e); + } + } + } + + protected boolean isStandardProperty(String propertyName) + { + for (int i = 0; i < STANDARD_JMS_PROPERTIES.length; i++) + { + if (propertyName.equals(STANDARD_JMS_PROPERTIES[i])) + { + return true; + } + } + return false; + } + + protected Message createTextMessage(Session ssn, final CreateProducerCommand command) throws JMSException + { + String payload = getMessagePayload(command); + TextMessage msg = ssn.createTextMessage(); + msg.setText(payload); + return msg; + } + + protected String getMessagePayload(final CreateProducerCommand command) + { + FutureTask<String> createTextFuture = new FutureTask<String>(new Callable<String>() + { + @Override + public String call() throws Exception + { + return StringUtils.repeat("a", command.getMessageSize()); + } + }); + + Future<String> future = _payloads.putIfAbsent(command.getMessageSize(), createTextFuture); + if (future == null) + { + createTextFuture.run(); + future = createTextFuture; + } + String payload = null; + try + { + payload = future.get(); + } + catch (Exception e) + { + throw new DistributedTestException("Unable to create message payload :" + e.getMessage(), e); + } + return payload; + } + + @Override + public String toString() + { + return "MessageProvider [_messageProperties=" + _messageProperties + "]"; + } + +} diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/Participant.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/Participant.java new file mode 100644 index 0000000000..941ec90565 --- /dev/null +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/Participant.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.disttest.client; + +import org.apache.qpid.disttest.message.ParticipantResult; + +public interface Participant +{ + ParticipantResult doIt(String registeredClientName) throws Exception; + + void releaseResources(); + + String getName(); + +} diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/ParticipantExecutor.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/ParticipantExecutor.java new file mode 100644 index 0000000000..dee1391868 --- /dev/null +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/ParticipantExecutor.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.disttest.client; + +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; + +import org.apache.commons.lang.builder.ToStringBuilder; +import org.apache.commons.lang.builder.ToStringStyle; +import org.apache.qpid.disttest.message.ParticipantResult; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ParticipantExecutor +{ + private static final Logger LOGGER = LoggerFactory.getLogger(ParticipantExecutor.class); + + private static final ExecutorService SHARED_UNBOUNDED_THREAD_POOL = Executors.newCachedThreadPool(new DaemonThreadFactory()); + + private Executor _executor = SHARED_UNBOUNDED_THREAD_POOL; + + private Client _client; + + private final Participant _participant; + + private final ParticipantResultFactory _factory; + + public ParticipantExecutor(Participant participant) + { + _participant = participant; + _factory = new ParticipantResultFactory(); + } + + /** + * Schedules the test participant to be run in a background thread. + */ + public void start(Client client) + { + _client = client; + + LOGGER.info("Starting test participant in background thread: " + this); + _executor.execute(new ParticipantRunnable()); + } + + public String getParticipantName() + { + return _participant.getName(); + } + + void setExecutor(Executor executor) + { + _executor = executor; + } + + private class ParticipantRunnable implements Runnable + { + @Override + public final void run() + { + Thread currentThread = Thread.currentThread(); + final String initialThreadName = currentThread.getName(); + currentThread.setName(initialThreadName + "-" + getParticipantName()); + + try + { + runParticipantAndSendResults(); + } + finally + { + currentThread.setName(initialThreadName); + } + } + + private void runParticipantAndSendResults() + { + ParticipantResult result = null; + try + { + result = _participant.doIt(_client.getClientName()); + } + catch (Throwable t) + { + String errorMessage = "Unhandled error: " + t.getMessage(); + LOGGER.error(errorMessage, t); + result = _factory.createForError(_participant.getName(), _client.getClientName(), errorMessage); + } + finally + { + _client.sendResults(result); + _participant.releaseResources(); + } + } + } + + private static final class DaemonThreadFactory implements ThreadFactory + { + @Override + public Thread newThread(Runnable r) + { + Thread thread = new Thread(r); + thread.setDaemon(true); + return thread; + } + } + + @Override + public String toString() + { + return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE) + .append("participantName", getParticipantName()) + .append("client", _client) + .toString(); + } +} diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/ParticipantExecutorRegistry.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/ParticipantExecutorRegistry.java new file mode 100644 index 0000000000..3d9780e640 --- /dev/null +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/ParticipantExecutorRegistry.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.disttest.client; + +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; + +public class ParticipantExecutorRegistry +{ + private final Set<ParticipantExecutor> _participantExecutors = Collections.synchronizedSet(new HashSet<ParticipantExecutor>()); + + public void add(ParticipantExecutor participantExecutor) + { + _participantExecutors.add(participantExecutor); + } + + public void clear() + { + _participantExecutors.clear(); + } + + public Collection<ParticipantExecutor> executors() + { + return Collections.unmodifiableSet(_participantExecutors); + } +} diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/ParticipantResultFactory.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/ParticipantResultFactory.java new file mode 100644 index 0000000000..61b64b8c4f --- /dev/null +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/ParticipantResultFactory.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.disttest.client; + +import java.util.Date; + +import org.apache.qpid.disttest.message.ConsumerParticipantResult; +import org.apache.qpid.disttest.message.CreateConsumerCommand; +import org.apache.qpid.disttest.message.CreateParticpantCommand; +import org.apache.qpid.disttest.message.CreateProducerCommand; +import org.apache.qpid.disttest.message.ParticipantResult; +import org.apache.qpid.disttest.message.ProducerParticipantResult; + +public class ParticipantResultFactory +{ + public ConsumerParticipantResult createForConsumer(String participantName, String clientRegisteredName, CreateConsumerCommand command, int numberOfMessagesReceived, int payloadSize, long totalPayloadReceived, Date start, Date end) + { + ConsumerParticipantResult consumerParticipantResult = new ConsumerParticipantResult(); + + setTestProperties(consumerParticipantResult, command, participantName, clientRegisteredName); + setTestResultProperties(consumerParticipantResult, numberOfMessagesReceived, payloadSize, totalPayloadReceived, start, end); + + consumerParticipantResult.setTopic(command.isTopic()); + consumerParticipantResult.setDurableSubscription(command.isDurableSubscription()); + consumerParticipantResult.setBrowsingSubscription(command.isBrowsingSubscription()); + consumerParticipantResult.setSelector(command.getSelector() != null); + consumerParticipantResult.setNoLocal(command.isNoLocal()); + consumerParticipantResult.setSynchronousConsumer(command.isSynchronous()); + + return consumerParticipantResult; + } + + public ProducerParticipantResult createForProducer(String participantName, String clientRegisteredName, CreateProducerCommand command, int numberOfMessagesSent, int payloadSize, long totalPayloadSent, Date start, Date end) + { + final ProducerParticipantResult participantResult = new ProducerParticipantResult(); + + participantResult.setStartDelay(command.getStartDelay()); + participantResult.setDeliveryMode(command.getDeliveryMode()); + participantResult.setPriority(command.getPriority()); + participantResult.setInterval(command.getInterval()); + participantResult.setTimeToLive(command.getTimeToLive()); + + setTestProperties(participantResult, command, participantName, clientRegisteredName); + + setTestResultProperties(participantResult, numberOfMessagesSent, payloadSize, totalPayloadSent, start, end); + + return participantResult; + } + + private void setTestResultProperties(final ParticipantResult participantResult, int numberOfMessagesSent, int payloadSize, long totalPayloadReceived, Date start, Date end) + { + participantResult.setNumberOfMessagesProcessed(numberOfMessagesSent); + participantResult.setPayloadSize(payloadSize); + participantResult.setTotalPayloadProcessed(totalPayloadReceived); + participantResult.setStartDate(start); + participantResult.setEndDate(end); + } + + private void setTestProperties(final ParticipantResult participantResult, CreateParticpantCommand command, String participantName, String clientRegisteredName) + { + participantResult.setParticipantName(participantName); + participantResult.setRegisteredClientName(clientRegisteredName); + participantResult.setBatchSize(command.getBatchSize()); + participantResult.setMaximumDuration(command.getMaximumDuration()); + + } + + public ParticipantResult createForError(String participantName, String clientRegisteredName, String errorMessage) + { + ParticipantResult result = new ParticipantResult(); + result.setParticipantName(participantName); + result.setRegisteredClientName(clientRegisteredName); + result.setErrorMessage(errorMessage); + + return result; + } + +} diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/ProducerParticipant.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/ProducerParticipant.java new file mode 100644 index 0000000000..a46794f380 --- /dev/null +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/ProducerParticipant.java @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.disttest.client; + +import java.util.Date; +import java.util.NavigableSet; +import java.util.TreeSet; + +import javax.jms.Message; + +import org.apache.qpid.disttest.DistributedTestException; +import org.apache.qpid.disttest.jms.ClientJmsDelegate; +import org.apache.qpid.disttest.message.CreateProducerCommand; +import org.apache.qpid.disttest.message.ParticipantResult; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ProducerParticipant implements Participant +{ + private static final Logger LOGGER = LoggerFactory.getLogger(ProducerParticipant.class); + + private final ClientJmsDelegate _jmsDelegate; + + private final CreateProducerCommand _command; + + private ParticipantResultFactory _resultFactory; + + public ProducerParticipant(final ClientJmsDelegate jmsDelegate, final CreateProducerCommand command) + { + _jmsDelegate = jmsDelegate; + _command = command; + _resultFactory = new ParticipantResultFactory(); + } + + @Override + public ParticipantResult doIt(String registeredClientName) throws Exception + { + if (_command.getMaximumDuration() == 0 && _command.getNumberOfMessages() == 0) + { + throw new DistributedTestException("number of messages and duration cannot both be zero"); + } + + long expectedDuration = _command.getMaximumDuration() - _command.getStartDelay(); + + doSleepForStartDelay(); + + final long startTime = System.currentTimeMillis(); + + Message lastPublishedMessage = null; + int numberOfMessagesSent = 0; + long totalPayloadSizeOfAllMessagesSent = 0; + NavigableSet<Integer> allProducedPayloadSizes = new TreeSet<Integer>(); + + while (true) + { + numberOfMessagesSent++; + + lastPublishedMessage = _jmsDelegate.sendNextMessage(_command); + + int lastPayloadSize = _jmsDelegate.calculatePayloadSizeFrom(lastPublishedMessage); + totalPayloadSizeOfAllMessagesSent += lastPayloadSize; + allProducedPayloadSizes.add(lastPayloadSize); + + if (LOGGER.isTraceEnabled()) + { + LOGGER.trace("message " + numberOfMessagesSent + " sent by " + this); + } + + final boolean batchLimitReached = _command.getBatchSize() <= 0 + || numberOfMessagesSent % _command.getBatchSize() == 0; + + if (batchLimitReached) + { + _jmsDelegate.commitOrAcknowledgeMessage(lastPublishedMessage, _command.getSessionName()); + + if (_command.getInterval() > 0) + { + // sleep for given time + Thread.sleep(_command.getInterval()); + } + } + + if (_command.getNumberOfMessages() > 0 && numberOfMessagesSent >= _command.getNumberOfMessages() + || expectedDuration > 0 && System.currentTimeMillis() - startTime >= expectedDuration) + { + break; + } + } + + // commit the remaining batch messages + if (_command.getBatchSize() > 0 && numberOfMessagesSent % _command.getBatchSize() != 0) + { + _jmsDelegate.commitOrAcknowledgeMessage(lastPublishedMessage, _command.getSessionName()); + } + + Date start = new Date(startTime); + Date end = new Date(); + int payloadSize = getPayloadSizeForResultIfConstantOrZeroOtherwise(allProducedPayloadSizes); + + return _resultFactory.createForProducer( + getName(), + registeredClientName, + _command, + numberOfMessagesSent, + payloadSize, + totalPayloadSizeOfAllMessagesSent, start, end); + } + + private int getPayloadSizeForResultIfConstantOrZeroOtherwise(NavigableSet<Integer> allPayloadSizes) + { + return allPayloadSizes.size() == 1 ? allPayloadSizes.first() : 0; + } + + private void doSleepForStartDelay() + { + if (_command.getStartDelay() > 0) + { + // start delay is specified. Sleeping... + try + { + Thread.sleep(_command.getStartDelay()); + } + catch (final InterruptedException e) + { + Thread.currentThread().interrupt(); + } + } + } + + @Override + public void releaseResources() + { + _jmsDelegate.closeTestProducer(_command.getParticipantName()); + } + + @Override + public String getName() + { + return _command.getParticipantName(); + } +} diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/property/GeneratedPropertySupport.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/property/GeneratedPropertySupport.java new file mode 100644 index 0000000000..a49ebf756e --- /dev/null +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/property/GeneratedPropertySupport.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.disttest.client.property; + +/** + * Provides support to generate message property values. + */ +public abstract class GeneratedPropertySupport implements GeneratedPropertyValue +{ + private Object _lastValue; + + public GeneratedPropertySupport() + { + super(); + _lastValue = null; + } + + @Override + public Object getValue() + { + Object result = nextValue(); + result = evaluate(result); + synchronized(this) + { + _lastValue = result; + } + return result; + } + + protected Object evaluate(Object result) + { + while (result instanceof PropertyValue) + { + result = ((PropertyValue)result).getValue(); + } + return result; + } + + public abstract Object nextValue(); + + public synchronized Object getLastValue() + { + return _lastValue; + } + + @Override + public String toString() + { + return "GeneratedPropertyValue [value=" + getLastValue() + ", @def=" + getDefinition() + "]"; + } + +} diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/property/GeneratedPropertyValue.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/property/GeneratedPropertyValue.java new file mode 100644 index 0000000000..39c093fac5 --- /dev/null +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/property/GeneratedPropertyValue.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.disttest.client.property; + +/** + * Provides operations to generate message property values. + */ +public interface GeneratedPropertyValue extends PropertyValue +{ + public String getDefinition(); +} diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/property/ListPropertyValue.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/property/ListPropertyValue.java new file mode 100644 index 0000000000..4444351976 --- /dev/null +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/property/ListPropertyValue.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.disttest.client.property; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +/** + * Provides property values from the underlining list of items. + */ +public class ListPropertyValue extends GeneratedPropertySupport +{ + public static final String DEF_VALUE = "list"; + private List<PropertyValue> _items; + private boolean _cyclic; + private int _currentIndex; + + public ListPropertyValue() + { + super(); + _cyclic = true; + _currentIndex = 0; + _items = new ArrayList<PropertyValue>(); + } + + public synchronized void setItems(List<PropertyValue> items) + { + _items = new ArrayList<PropertyValue>(items); + } + + public synchronized List<PropertyValue> getItems() + { + return Collections.unmodifiableList(_items); + } + + public synchronized void setCyclic(boolean cyclic) + { + _cyclic = cyclic; + } + + public synchronized boolean isCyclic() + { + return _cyclic; + } + + @Override + public synchronized Object nextValue() + { + if (_currentIndex >= _items.size()) + { + if (_cyclic) + { + _currentIndex = 0; + } + else + { + _currentIndex = _items.size() -1; + } + } + Object nextValue = _items.get(_currentIndex); + _currentIndex++; + return nextValue; + } + + @Override + public String getDefinition() + { + return DEF_VALUE; + } + + @Override + public int hashCode() + { + final int prime = 31; + int result = super.hashCode(); + result = prime * result + _currentIndex; + result = prime * result + (_cyclic ? 1231 : 1237); + result = prime * result + ((_items == null) ? 0 : _items.hashCode()); + return result; + } + + @Override + public boolean equals(Object obj) + { + if (this == obj) + { + return true; + } + if (obj == null || !(obj instanceof ListPropertyValue)) + { + return false; + } + ListPropertyValue other = (ListPropertyValue) obj; + if (_cyclic != other._cyclic) + { + return false; + } + if (_items == null && other._items != null) + { + return false; + } + return _items.equals(other._items); + } + +} diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/property/NumericGeneratedPropertySupport.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/property/NumericGeneratedPropertySupport.java new file mode 100644 index 0000000000..e0ae137c35 --- /dev/null +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/property/NumericGeneratedPropertySupport.java @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.disttest.client.property; + +import java.util.Arrays; + +/** + * Provides support for numeric generators with lower and upper boundaries. + */ +public abstract class NumericGeneratedPropertySupport extends GeneratedPropertySupport +{ + public static final Class<?>[] SUPPORTED_TYPES = { double.class, float.class, int.class, long.class, short.class, + byte.class }; + + private String _type; + private double _upper; + private double _lower; + + + public NumericGeneratedPropertySupport() + { + super(); + _type = SUPPORTED_TYPES[0].getName(); + _upper = Double.MAX_VALUE; + _lower = 0.0; + } + + public synchronized String getType() + { + return _type; + } + + public synchronized double getUpper() + { + return _upper; + } + + public synchronized double getLower() + { + return _lower; + } + + public synchronized void setUpper(double upper) + { + _upper = upper; + } + + public synchronized void setLower(double lower) + { + _lower = lower; + } + + public synchronized void setType(String type) + { + _type = toClass(type).getName(); + } + + protected Class<?> toClass(String type) + { + Class<?> t = null; + for (int i = 0; i < SUPPORTED_TYPES.length; i++) + { + if (SUPPORTED_TYPES[i].getName().equals(type)) + { + t = SUPPORTED_TYPES[i]; + break; + } + } + if (t == null) + { + throw new IllegalArgumentException("Type " + type + " is not supported: " + + Arrays.toString(SUPPORTED_TYPES)); + } + return t; + } + + @Override + public Object nextValue() + { + double result = nextDouble(); + return doubleToNumber(result, toClass(_type)); + } + + protected Number doubleToNumber(double value, Class<?> targetType) + { + Number result = null; + if (targetType == double.class) + { + result = new Double(value); + } + else if (targetType == float.class) + { + result = new Float(value); + } + else if (targetType == int.class) + { + result = new Integer((int) value); + } + else if (targetType == long.class) + { + result = new Long((long) value); + } + else if (targetType == short.class) + { + result = new Short((short) value); + } + else if (targetType == byte.class) + { + result = new Byte((byte) value); + } + else + { + throw new IllegalArgumentException("Type " + targetType + " is not supported: " + + Arrays.toString(SUPPORTED_TYPES)); + } + return result; + } + + protected abstract double nextDouble(); + + @Override + public int hashCode() + { + final int prime = 31; + int result = super.hashCode(); + long temp; + temp = Double.doubleToLongBits(_lower); + result = prime * result + (int) (temp ^ (temp >>> 32)); + result = prime * result + ((_type == null) ? 0 : _type.hashCode()); + temp = Double.doubleToLongBits(_upper); + result = prime * result + (int) (temp ^ (temp >>> 32)); + return result; + } + + @Override + public boolean equals(Object obj) + { + if (this == obj) + { + return true; + } + if (obj == null || !(obj instanceof NumericGeneratedPropertySupport)) + { + return false; + } + NumericGeneratedPropertySupport other = (NumericGeneratedPropertySupport) obj; + if (Double.doubleToLongBits(_lower) != Double.doubleToLongBits(other._lower) + || Double.doubleToLongBits(_upper) != Double.doubleToLongBits(other._upper)) + { + return false; + } + if (_type == null && other._type != null) + { + return false; + } + else if (!_type.equals(other._type)) + { + return false; + } + return true; + } + +} diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/property/PropertyValue.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/property/PropertyValue.java new file mode 100644 index 0000000000..97adc0cee1 --- /dev/null +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/property/PropertyValue.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.disttest.client.property; + +/** + * Provides operations to get a message property value. + */ +public interface PropertyValue +{ + public Object getValue(); +} diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/property/PropertyValueFactory.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/property/PropertyValueFactory.java new file mode 100644 index 0000000000..fa44b2da1e --- /dev/null +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/property/PropertyValueFactory.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.disttest.client.property; + +import org.apache.commons.lang.StringUtils; +import org.apache.qpid.disttest.DistributedTestException; + +/** + * Creates property value instances using given alias (type) value. + */ +public class PropertyValueFactory +{ + public PropertyValue createPropertyValue(String type) + { + try + { + return (PropertyValue)getPropertyValueClass(type).newInstance(); + } + catch(Exception e) + { + throw new DistributedTestException("Unable to create a generator for a type:" + type, e); + } + } + + public Class<?> getPropertyValueClass(String type) throws ClassNotFoundException + { + String className = "org.apache.qpid.disttest.client.property." + StringUtils.capitalize(type) + "PropertyValue"; + return Class.forName(className); + } +} diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/property/RandomPropertyValue.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/property/RandomPropertyValue.java new file mode 100644 index 0000000000..4f44a4bca8 --- /dev/null +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/property/RandomPropertyValue.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.disttest.client.property; + +/** + * Generates random property values in a given lower and upper boundaries. + */ +public class RandomPropertyValue extends NumericGeneratedPropertySupport +{ + public static final String DEF_VALUE = "random"; + + public RandomPropertyValue() + { + super(); + } + + @Override + protected double nextDouble() + { + double lower = getLower(); + double upper = getUpper(); + return lower + Math.random() * (upper - lower); + } + + @Override + public String getDefinition() + { + return DEF_VALUE; + } + +} diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/property/RangePropertyValue.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/property/RangePropertyValue.java new file mode 100644 index 0000000000..3aca4d4bca --- /dev/null +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/property/RangePropertyValue.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.disttest.client.property; + +/** + * Generates property values from a range with given lower and upper boundaries. + */ +public class RangePropertyValue extends NumericGeneratedPropertySupport +{ + public static final String DEF_VALUE = "range"; + private double _step; + private double _currentValue; + private boolean _cyclic; + + public RangePropertyValue() + { + super(); + _cyclic = true; + _currentValue = 0.0; + _step = 1.0; + } + + public synchronized double getStep() + { + return _step; + } + + public synchronized boolean isCyclic() + { + return _cyclic; + } + + public synchronized void setCyclic(boolean cyclic) + { + _cyclic = cyclic; + } + + public synchronized void setStep(double step) + { + _step = step; + } + + @Override + public synchronized double nextDouble() + { + double result = 0.0; + double lower = getLower(); + double upper = getUpper(); + if (_currentValue < lower) + { + _currentValue = lower; + } + else if (_currentValue > upper) + { + if (_cyclic) + { + _currentValue = lower; + } + else + { + _currentValue = upper; + } + } + result = _currentValue; + _currentValue += _step; + return result; + } + + @Override + public String getDefinition() + { + return DEF_VALUE; + } + + @Override + public int hashCode() + { + final int prime = 31; + int result = super.hashCode(); + long temp; + temp = Double.doubleToLongBits(_currentValue); + result = prime * result + (int) (temp ^ (temp >>> 32)); + result = prime * result + (_cyclic ? 1231 : 1237); + temp = Double.doubleToLongBits(_step); + result = prime * result + (int) (temp ^ (temp >>> 32)); + return result; + } + + @Override + public boolean equals(Object obj) + { + if (this == obj) + { + return true; + } + if (!(obj instanceof RangePropertyValue)) + { + return false; + } + if (!super.equals(obj)) + { + return false; + } + RangePropertyValue other = (RangePropertyValue) obj; + if (Double.doubleToLongBits(_currentValue) != Double.doubleToLongBits(other._currentValue) + || Double.doubleToLongBits(_step) != Double.doubleToLongBits(other._step) || _cyclic != other._cyclic) + { + return false; + } + + return true; + } +} diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/property/SimplePropertyValue.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/property/SimplePropertyValue.java new file mode 100644 index 0000000000..9141e68656 --- /dev/null +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/property/SimplePropertyValue.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.disttest.client.property; + +/** + * Simple property value holder for a constant properties. + */ +public class SimplePropertyValue implements PropertyValue +{ + private Object _value; + + public SimplePropertyValue() + { + super(); + } + + public SimplePropertyValue(Object value) + { + super(); + this._value = value; + } + + @Override + public Object getValue() + { + return _value; + } + + @Override + public String toString() + { + return "SimplePropertyValue [value=" + _value + "]"; + } + + @Override + public int hashCode() + { + final int prime = 31; + int result = 1; + result = prime * result + ((_value == null) ? 0 : _value.hashCode()); + return result; + } + + @Override + public boolean equals(Object obj) + { + if (this == obj) + { + return true; + } + if (obj == null || getClass() != obj.getClass()) + { + return false; + } + SimplePropertyValue other = (SimplePropertyValue) obj; + if (_value == null && other._value != null) + { + return false; + } + return _value.equals(other._value); + } + +} diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/controller/ClientRegistry.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/controller/ClientRegistry.java new file mode 100644 index 0000000000..b049da1f84 --- /dev/null +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/controller/ClientRegistry.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.disttest.controller; + +import java.util.Collection; +import java.util.Collections; +import java.util.Set; +import java.util.concurrent.ConcurrentSkipListSet; + +import org.apache.qpid.disttest.DistributedTestException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ClientRegistry +{ + private static final Logger LOGGER = LoggerFactory.getLogger(ClientRegistry.class); + + private final Set<String> _registeredClientNames = new ConcurrentSkipListSet<String>(); + + public void registerClient(String clientName) + { + final boolean alreadyContainsClient = !_registeredClientNames.add(clientName); + if (alreadyContainsClient) + { + throw new DistributedTestException("Duplicate client name " + clientName); + } + + LOGGER.info("Client registered: " + clientName); + } + + public Collection<String> getClients() + { + return Collections.unmodifiableSet(_registeredClientNames); + } + +} diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/controller/CommandForClient.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/controller/CommandForClient.java new file mode 100644 index 0000000000..6c0c253807 --- /dev/null +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/controller/CommandForClient.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.disttest.controller; + +import org.apache.qpid.disttest.message.Command; + +public class CommandForClient +{ + private String _clientName; + private Command _command; + + public CommandForClient(String clientName, Command command) + { + _clientName = clientName; + _command = command; + } + + public String getClientName() + { + return _clientName; + } + + public Command getCommand() + { + return _command; + } + + +} diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/controller/CommandListener.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/controller/CommandListener.java new file mode 100644 index 0000000000..e2f40bebe8 --- /dev/null +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/controller/CommandListener.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.disttest.controller; + +import org.apache.qpid.disttest.message.Command; + +public interface CommandListener +{ + + public abstract void processCommand(Command command); + + public boolean supports(Command command); + +} diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/controller/Controller.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/controller/Controller.java new file mode 100644 index 0000000000..a5e0933704 --- /dev/null +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/controller/Controller.java @@ -0,0 +1,227 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.disttest.controller; + +import java.util.Collection; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.apache.qpid.disttest.DistributedTestException; +import org.apache.qpid.disttest.controller.config.Config; +import org.apache.qpid.disttest.controller.config.TestInstance; +import org.apache.qpid.disttest.jms.ControllerJmsDelegate; +import org.apache.qpid.disttest.message.Command; +import org.apache.qpid.disttest.message.CommandType; +import org.apache.qpid.disttest.message.RegisterClientCommand; +import org.apache.qpid.disttest.message.Response; +import org.apache.qpid.disttest.message.StopClientCommand; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class Controller +{ + private static final Logger LOGGER = LoggerFactory.getLogger(Controller.class); + + private final long _registrationTimeout; + private final long _commandResponseTimeout; + + private final ControllerJmsDelegate _jmsDelegate; + + private volatile CountDownLatch _clientRegistrationLatch; + private volatile CountDownLatch _stopClientsResponseLatch = null; + + private Config _config; + private TestRunnerFactory _testRunnerFactory; + private ClientRegistry _clientRegistry; + + private long _testResultTimeout = TestRunner.WAIT_FOREVER; + + public Controller(final ControllerJmsDelegate jmsDelegate, long registrationTimeout, long commandResponseTimeout) + { + _jmsDelegate = jmsDelegate; + _registrationTimeout = registrationTimeout; + _commandResponseTimeout = commandResponseTimeout; + _testRunnerFactory = new TestRunnerFactory(); + _clientRegistry = new ClientRegistry(); + } + + public void setConfig(Config config) + { + _config = config; + validateConfiguration(); + int numberOfClients = config.getTotalNumberOfClients(); + _clientRegistrationLatch = new CountDownLatch(numberOfClients); + + _jmsDelegate.addCommandListener(new RegisterClientCommandListener()); + _jmsDelegate.addCommandListener(new StopClientResponseListener()); + _jmsDelegate.start(); + } + + + public void awaitClientRegistrations() + { + LOGGER.info("Awaiting client registration"); + awaitLatch(_clientRegistrationLatch, _registrationTimeout, "Timed out waiting for registrations. Expecting %d more registrations"); + } + + private void validateConfiguration() + { + if (_config == null || _config.getTotalNumberOfClients() == 0) + { + throw new DistributedTestException("No controller config or no clients specified in test config"); + } + } + + private void awaitLatch(CountDownLatch latch, long timeout, String messageWithOneDecimalPlaceholder) + { + try + { + final boolean countedDownOK = latch.await(timeout, TimeUnit.MILLISECONDS); + if (!countedDownOK) + { + final long latchCount = latch.getCount(); + String formattedMessage = String.format(messageWithOneDecimalPlaceholder, latchCount); + throw new DistributedTestException(formattedMessage); + } + } + catch (final InterruptedException e) + { + Thread.currentThread().interrupt(); + } + } + + public void registerClient(final RegisterClientCommand registrationCommand) + { + final String clientName = registrationCommand.getClientName(); + + _clientRegistry.registerClient(clientName); + _jmsDelegate.registerClient(registrationCommand); + + _clientRegistrationLatch.countDown(); + LOGGER.info("Counted down latch for client: " + clientName + " latch count=" + _clientRegistrationLatch.getCount()); + } + + void processStopClientResponse(final Response response) + { + _stopClientsResponseLatch.countDown(); + if (response.hasError()) + { + LOGGER.error("Client " + response.getRegisteredClientName() + " reported exception in response to command : " + + response.getErrorMessage()); + } + } + + public void stopAllRegisteredClients() + { + Collection<String> registeredClients = _clientRegistry.getClients(); + + LOGGER.info("Stopping all clients"); + _stopClientsResponseLatch = new CountDownLatch(registeredClients.size()); + Command command = new StopClientCommand(); + for (final String clientName : registeredClients) + { + _jmsDelegate.sendCommandToClient(clientName, command); + } + + awaitLatch(_stopClientsResponseLatch, _commandResponseTimeout, "Timed out waiting for stop command responses. Expecting %d more responses."); + + LOGGER.info("Stopped all clients"); + } + + + public ResultsForAllTests runAllTests() + { + LOGGER.info("Running all tests"); + + ResultsForAllTests resultsForAllTests = new ResultsForAllTests(); + + for (TestInstance testInstance : _config.getTests()) + { + + ParticipatingClients participatingClients = new ParticipatingClients(_clientRegistry, testInstance.getClientNames()); + + LOGGER.info("Running test " + testInstance + ". Participating clients: " + participatingClients.getRegisteredNames()); + TestRunner runner = _testRunnerFactory.createTestRunner(participatingClients, + testInstance, + _jmsDelegate, + _commandResponseTimeout, + _testResultTimeout); + + TestResult testResult = runner.run(); + LOGGER.info("Finished test " + testInstance); + + resultsForAllTests.add(testResult); + } + + return resultsForAllTests; + } + + private final class StopClientResponseListener implements CommandListener + { + @Override + public boolean supports(Command command) + { + return command.getType() == CommandType.RESPONSE && ((Response)command).getInReplyToCommandType() == CommandType.STOP_CLIENT; + } + + @Override + public void processCommand(Command command) + { + processStopClientResponse((Response)command); + } + } + + private final class RegisterClientCommandListener implements + CommandListener + { + @Override + public boolean supports(Command command) + { + return command.getType() == CommandType.REGISTER_CLIENT; + } + + @Override + public void processCommand(Command command) + { + registerClient((RegisterClientCommand)command); + } + } + + public void setTestResultTimeout(final long testResultTimeout) + { + _testResultTimeout = testResultTimeout; + + } + + void setClientRegistry(ClientRegistry clientRegistry) + { + _clientRegistry = clientRegistry; + + } + + void setTestRunnerFactory(TestRunnerFactory factory) + { + if (factory == null) + { + throw new IllegalArgumentException("TestRunnerFactory cannot be null!"); + } + _testRunnerFactory = factory; + } +} diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/controller/ParticipatingClients.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/controller/ParticipatingClients.java new file mode 100644 index 0000000000..077d628697 --- /dev/null +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/controller/ParticipatingClients.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.disttest.controller; + +import java.util.Collection; +import java.util.List; +import java.util.TreeSet; + +import org.apache.commons.collections.BidiMap; +import org.apache.commons.collections.bidimap.DualHashBidiMap; +import org.apache.commons.lang.builder.ToStringBuilder; +import org.apache.commons.lang.builder.ToStringStyle; + +public class ParticipatingClients +{ + private final BidiMap _configuredToRegisteredNameMap; + + public ParticipatingClients(ClientRegistry clientRegistry, List<String> configuredClientNamesForTest) + { + _configuredToRegisteredNameMap = mapConfiguredToRegisteredClientNames(configuredClientNamesForTest, clientRegistry); + } + + public String getRegisteredNameFromConfiguredName(String clientConfiguredName) + { + String registeredClientName = (String) _configuredToRegisteredNameMap.get(clientConfiguredName); + if (registeredClientName == null) + { + throw new IllegalArgumentException("Unrecognised client configured name " + clientConfiguredName + + " Mapping is " + _configuredToRegisteredNameMap); + } + return registeredClientName; + } + + public String getConfiguredNameFromRegisteredName(String registeredClientName) + { + String clientConfiguredName = (String) _configuredToRegisteredNameMap.getKey(registeredClientName); + if (clientConfiguredName == null) + { + throw new IllegalArgumentException("Unrecognised client registered name " + registeredClientName + + " Mapping is " + _configuredToRegisteredNameMap); + } + + return clientConfiguredName; + } + + private BidiMap mapConfiguredToRegisteredClientNames(List<String> configuredClientNamesForTest, ClientRegistry clientRegistry) + { + BidiMap configuredToRegisteredNameMap = new DualHashBidiMap(); + + TreeSet<String> registeredClients = new TreeSet<String>(clientRegistry.getClients()); + for (String configuredClientName : configuredClientNamesForTest) + { + String allocatedClientName = registeredClients.pollFirst(); + if (allocatedClientName == null) + { + throw new IllegalArgumentException("Too few clients in registry " + clientRegistry + " configured clients " + configuredClientNamesForTest); + } + configuredToRegisteredNameMap.put(configuredClientName, allocatedClientName); + } + + return configuredToRegisteredNameMap; + } + + @SuppressWarnings("unchecked") + public Collection<String> getRegisteredNames() + { + return _configuredToRegisteredNameMap.values(); + } + + @Override + public String toString() + { + return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE) + .append("configuredToRegisteredNameMap", _configuredToRegisteredNameMap).toString(); + } + + +} diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/controller/ResultsForAllTests.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/controller/ResultsForAllTests.java new file mode 100644 index 0000000000..6c5ff3450c --- /dev/null +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/controller/ResultsForAllTests.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.disttest.controller; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.qpid.disttest.results.aggregation.ITestResult; + +public class ResultsForAllTests +{ + private List<ITestResult> _results = new ArrayList<ITestResult>(); + private boolean _hasErrors; + + public List<ITestResult> getTestResults() + { + return _results; + } + + public void add(ITestResult testResult) + { + _results.add(testResult); + if(testResult.hasErrors()) + { + _hasErrors = true; + } + } + + public boolean hasErrors() + { + return _hasErrors; + } +} diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/controller/TestResult.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/controller/TestResult.java new file mode 100644 index 0000000000..756c641532 --- /dev/null +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/controller/TestResult.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.disttest.controller; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.SortedSet; +import java.util.TreeSet; + +import org.apache.qpid.disttest.message.ParticipantResult; +import org.apache.qpid.disttest.results.aggregation.ITestResult; + +public class TestResult implements ITestResult +{ + private final SortedSet<ParticipantResult> _participantResults = Collections.synchronizedSortedSet( + new TreeSet<ParticipantResult>(ParticipantResult.PARTICIPANT_NAME_COMPARATOR)); + + private boolean _hasErrors; + private String _name; + + public TestResult(String name) + { + _name = name; + } + + @Override + public List<ParticipantResult> getParticipantResults() + { + List<ParticipantResult> list = new ArrayList<ParticipantResult>(_participantResults); + return Collections.unmodifiableList(list); + } + + public void addParticipantResult(ParticipantResult participantResult) + { + _participantResults.add(participantResult); + if(participantResult.hasError()) + { + _hasErrors = true; + } + } + + @Override + public boolean hasErrors() + { + return _hasErrors; + } + + @Override + public String getName() + { + return _name; + } +} diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/controller/TestRunner.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/controller/TestRunner.java new file mode 100644 index 0000000000..f81b691ea6 --- /dev/null +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/controller/TestRunner.java @@ -0,0 +1,307 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.disttest.controller; + +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.apache.qpid.disttest.DistributedTestException; +import org.apache.qpid.disttest.controller.config.QueueConfig; +import org.apache.qpid.disttest.controller.config.TestInstance; +import org.apache.qpid.disttest.jms.ControllerJmsDelegate; +import org.apache.qpid.disttest.message.Command; +import org.apache.qpid.disttest.message.CommandType; +import org.apache.qpid.disttest.message.ParticipantResult; +import org.apache.qpid.disttest.message.Response; +import org.apache.qpid.disttest.message.StartTestCommand; +import org.apache.qpid.disttest.message.TearDownTestCommand; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TestRunner +{ + private static final Logger LOGGER = LoggerFactory.getLogger(TestRunner.class); + + private static final long PARTICIPANT_RESULTS_LOG_INTERVAL = 60000; + public static final long WAIT_FOREVER = -1; + + private final long _commandResponseTimeout; + + private final Set<CommandType> _setOfResponsesToExpect = Collections.synchronizedSet(new HashSet<CommandType>()); + + private final ParticipatingClients _participatingClients; + + private final TestInstance _testInstance; + private ControllerJmsDelegate _jmsDelegate; + + private volatile CountDownLatch _commandResponseLatch = null; + private final CountDownLatch _testResultsLatch; + private final TestResult _testResult; + + /** Length of time to await test results or {@value #WAIT_FOREVER} */ + private final long _testResultTimeout; + + + public TestRunner(ParticipatingClients participatingClients, TestInstance testInstance, ControllerJmsDelegate jmsDelegate, long commandResponseTimeout, long testResultTimeout) + { + _participatingClients = participatingClients; + _testInstance = testInstance; + _jmsDelegate = jmsDelegate; + _commandResponseTimeout = commandResponseTimeout; + _testResultsLatch = new CountDownLatch(testInstance.getTotalNumberOfParticipants()); + _testResultTimeout = testResultTimeout; + _testResult = new TestResult(testInstance.getName()); + } + + public TestResult run() + { + final ParticipantResultListener participantResultListener = new ParticipantResultListener(); + TestCommandResponseListener testCommandResponseListener = new TestCommandResponseListener(); + + try + { + _jmsDelegate.addCommandListener(testCommandResponseListener); + _jmsDelegate.addCommandListener(participantResultListener); + + runParts(); + + return _testResult; + } + finally + { + _jmsDelegate.removeCommandListener(participantResultListener); + _jmsDelegate.removeCommandListener(testCommandResponseListener); + } + } + + private void runParts() + { + boolean queuesCreated = false; + try + { + createQueues(); + queuesCreated = true; + sendTestSetupCommands(); + awaitCommandResponses(); + sendCommandToParticipatingClients(new StartTestCommand()); + awaitCommandResponses(); + + awaitTestResults(); + + sendCommandToParticipatingClients(new TearDownTestCommand()); + awaitCommandResponses(); + } + finally + { + if (queuesCreated) + { + deleteQueues(); + } + } + } + + void createQueues() + { + List<QueueConfig> queues = _testInstance.getQueues(); + if (!queues.isEmpty()) + { + _jmsDelegate.createQueues(queues); + } + } + + void sendTestSetupCommands() + { + List<CommandForClient> commandsForAllClients = _testInstance.createCommands(); + _commandResponseLatch = new CountDownLatch(commandsForAllClients.size()); + for (CommandForClient commandForClient : commandsForAllClients) + { + String configuredClientName = commandForClient.getClientName(); + String registeredClientName = _participatingClients.getRegisteredNameFromConfiguredName(configuredClientName); + + Command command = commandForClient.getCommand(); + sendCommandInternal(registeredClientName, command); + } + } + + void awaitCommandResponses() + { + awaitLatch(_commandResponseLatch, _commandResponseTimeout, "Timed out waiting for command responses. Expecting %d more responses."); + } + + + void processCommandResponse(final Response response) + { + if (LOGGER.isDebugEnabled()) + { + LOGGER.debug("Received response for command " + response); + } + + _commandResponseLatch.countDown(); + checkForResponseError(response); + } + + + void awaitTestResults() + { + long timeout = _testResultTimeout; + DistributedTestException lastException = null; + + boolean waitForever = _testResultTimeout == WAIT_FOREVER; + final long interval = waitForever ? PARTICIPANT_RESULTS_LOG_INTERVAL : Math.min(PARTICIPANT_RESULTS_LOG_INTERVAL, _testResultTimeout); + + while(_testResultsLatch.getCount() > 0 && (waitForever || timeout > 0)) + { + try + { + awaitLatch(_testResultsLatch, interval, "Waiting for participant results... Expecting %d more responses."); + } + catch (DistributedTestException e) + { + lastException = e; + LOGGER.info(e.getMessage()); + } + + if (!waitForever) + { + timeout =- interval; + } + } + + if (_testResultsLatch.getCount() > 0) + { + throw lastException; + } + } + + void deleteQueues() + { + List<QueueConfig> queues = _testInstance.getQueues(); + if (!queues.isEmpty()) + { + _jmsDelegate.deleteQueues(queues); + } + } + + void sendCommandToParticipatingClients(final Command command) + { + Collection<String> participatingRegisteredClients = _participatingClients.getRegisteredNames(); + _commandResponseLatch = new CountDownLatch(participatingRegisteredClients.size()); + for (final String clientName : participatingRegisteredClients) + { + sendCommandInternal(clientName, command); + } + } + + public void processParticipantResult(ParticipantResult result) + { + setOriginalTestDetailsOn(result); + + _testResult.addParticipantResult(result); + LOGGER.info("Received result " + result); + + _testResultsLatch.countDown(); + checkForResponseError(result); + } + + private void setOriginalTestDetailsOn(ParticipantResult result) + { + // Client knows neither the configured client name nor test name + String registeredClientName = result.getRegisteredClientName(); + String configuredClient = _participatingClients.getConfiguredNameFromRegisteredName(registeredClientName); + + result.setConfiguredClientName(configuredClient); + result.setTestName(_testInstance.getName()); + result.setIterationNumber(_testInstance.getIterationNumber()); + } + + private void sendCommandInternal(String registeredClientName, Command command) + { + _setOfResponsesToExpect.add(command.getType()); + _jmsDelegate.sendCommandToClient(registeredClientName, command); + } + + private void awaitLatch(CountDownLatch latch, long timeout, String messageWithOneDecimalPlaceholder) + { + try + { + final boolean countedDownOK = latch.await(timeout, TimeUnit.MILLISECONDS); + if (!countedDownOK) + { + final long latchCount = latch.getCount(); + String formattedMessage = String.format(messageWithOneDecimalPlaceholder, latchCount); + throw new DistributedTestException(formattedMessage); + } + } + catch (final InterruptedException e) + { + Thread.currentThread().interrupt(); + } + } + + private void checkForResponseError(final Response response) + { + if (response.hasError()) + { + LOGGER.error("Client " + response.getRegisteredClientName() + " reported error " + response); + } + } + + final class ParticipantResultListener implements CommandListener + { + @Override + public boolean supports(Command command) + { + return command instanceof ParticipantResult; + } + + @Override + public void processCommand(Command command) + { + processParticipantResult((ParticipantResult) command); + + } + } + + final class TestCommandResponseListener implements CommandListener + { + @Override + public void processCommand(Command command) + { + processCommandResponse((Response)command); + } + + @Override + public boolean supports(Command command) + { + CommandType type = command.getType(); + if (type == CommandType.RESPONSE) + { + Response response = (Response)command; + return _setOfResponsesToExpect.contains(response.getInReplyToCommandType()); + } + return false; + } + } + +} diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/controller/TestRunnerFactory.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/controller/TestRunnerFactory.java new file mode 100644 index 0000000000..bf0e5afb9c --- /dev/null +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/controller/TestRunnerFactory.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.disttest.controller; + +import org.apache.qpid.disttest.controller.config.TestInstance; +import org.apache.qpid.disttest.jms.ControllerJmsDelegate; + +public class TestRunnerFactory +{ + public TestRunner createTestRunner(ParticipatingClients participatingClients, TestInstance testInstance, ControllerJmsDelegate jmsDelegate, long commandResponseTimeout, long testResultTimeout) + { + return new TestRunner(participatingClients, testInstance, jmsDelegate, commandResponseTimeout, testResultTimeout); + } +} diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/controller/config/ClientConfig.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/controller/config/ClientConfig.java new file mode 100644 index 0000000000..4353a85cd3 --- /dev/null +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/controller/config/ClientConfig.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.disttest.controller.config; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import org.apache.qpid.disttest.controller.CommandForClient; +import org.apache.qpid.disttest.message.Command; + +public class ClientConfig +{ + /* + * TODO add this field when repeating groups of clients need to be used. Talk to Phil and Keith! + * private int _instances; + */ + + private List<ConnectionConfig> _connections; + private List<MessageProviderConfig> _messageProviders; + private String _name; + + public ClientConfig() + { + _name = null; + _connections = Collections.emptyList(); + _messageProviders = Collections.emptyList(); + } + + public ClientConfig(String name, ConnectionConfig... connections) + { + this(name, Arrays.asList(connections), null); + } + + public ClientConfig(String name, List<ConnectionConfig> connections, List<MessageProviderConfig> messageProviders) + { + _name = name; + _connections = connections; + if (messageProviders == null) + { + _messageProviders = Collections.emptyList(); + } + else + { + _messageProviders = messageProviders; + } + } + + public String getName() + { + return _name; + } + + public List<ConnectionConfig> getConnections() + { + return Collections.unmodifiableList(_connections); + } + + public List<CommandForClient> createCommands() + { + List<CommandForClient> commandsForClient = new ArrayList<CommandForClient>(); + + for (MessageProviderConfig messageProvider : _messageProviders) + { + Command command = messageProvider.createCommand(); + commandsForClient.add(new CommandForClient(_name, command)); + } + for (ConnectionConfig connection : _connections) + { + List<Command> commands = connection.createCommands(); + for (Command command : commands) + { + commandsForClient.add(new CommandForClient(_name, command)); + } + } + return commandsForClient; + } + + public int getTotalNumberOfParticipants() + { + int numOfParticipants = 0; + for (ConnectionConfig connection : _connections) + { + numOfParticipants = numOfParticipants + connection.getTotalNumberOfParticipants(); + } + return numOfParticipants; + } + + public List<MessageProviderConfig> getMessageProviders() + { + return Collections.unmodifiableList(_messageProviders); + } + + +} diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/controller/config/Config.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/controller/config/Config.java new file mode 100644 index 0000000000..fe56137276 --- /dev/null +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/controller/config/Config.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.disttest.controller.config; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +public class Config +{ + private List<TestConfig> _tests; + + public Config() + { + super(); + _tests = Collections.emptyList(); + } + + public Config(TestConfig... tests) + { + _tests = Arrays.asList(tests); + } + + public List<TestInstance> getTests() + { + List<TestInstance> testInstances = new ArrayList<TestInstance>(); + for (TestConfig testConfig : _tests) + { + int iterationNumber = 0; + + List<IterationValue> iterationValues = testConfig.getIterationValues(); + if(iterationValues.isEmpty()) + { + testInstances.add(new TestInstance(testConfig)); + } + else + { + for (IterationValue iterationValue : iterationValues) + { + testInstances.add(new TestInstance(testConfig, iterationNumber, iterationValue)); + iterationNumber++; + } + } + } + + return Collections.unmodifiableList(testInstances); + } + + List<TestConfig> getTestConfigs() + { + return Collections.unmodifiableList(_tests); + } + + public int getTotalNumberOfClients() + { + int numberOfClients = 0; + for (TestConfig testConfig : _tests) + { + numberOfClients = Math.max(testConfig.getTotalNumberOfClients(), numberOfClients); + } + return numberOfClients; + } + +} diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/controller/config/ConfigReader.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/controller/config/ConfigReader.java new file mode 100644 index 0000000000..6288b42eac --- /dev/null +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/controller/config/ConfigReader.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.disttest.controller.config; + +import java.io.FileNotFoundException; +import java.io.FileReader; +import java.io.Reader; + +import org.apache.qpid.disttest.client.property.PropertyValue; +import org.apache.qpid.disttest.json.PropertyValueAdapter; + +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; + +public class ConfigReader +{ + + public Config getConfigFromFile(String fileName) throws FileNotFoundException + { + FileReader reader = new FileReader(fileName); + + Config config = readConfig(reader); + return config; + } + + public Config readConfig(Reader reader) + { + Gson gson = new GsonBuilder() + .registerTypeAdapter(PropertyValue.class, new PropertyValueAdapter()) + .create(); + Config config = gson.fromJson(reader, Config.class); + return config; + } + +} diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/controller/config/ConnectionConfig.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/controller/config/ConnectionConfig.java new file mode 100644 index 0000000000..e2cc31e21e --- /dev/null +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/controller/config/ConnectionConfig.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.disttest.controller.config; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import org.apache.qpid.disttest.message.Command; +import org.apache.qpid.disttest.message.CreateConnectionCommand; + +public class ConnectionConfig +{ + private String _name; + private List<SessionConfig> _sessions; + private String _factory; + + // For Gson + public ConnectionConfig() + { + super(); + _sessions = Collections.emptyList(); + } + + public ConnectionConfig(String name, String factory, SessionConfig... sessions) + { + super(); + _name = name; + _factory = factory; + _sessions = Arrays.asList(sessions); + + } + + public List<SessionConfig> getSessions() + { + return Collections.unmodifiableList(_sessions); + } + + public String getName() + { + return _name; + } + + public List<Command> createCommands() + { + List<Command> commands = new ArrayList<Command>(); + commands.add(createCommand()); + for (SessionConfig sessionConfig : _sessions) + { + commands.addAll(sessionConfig.createCommands(_name)); + } + return commands; + } + + private CreateConnectionCommand createCommand() + { + CreateConnectionCommand command = new CreateConnectionCommand(); + command.setConnectionName(_name); + command.setConnectionFactoryName(_factory); + return command; + } + + public int getTotalNumberOfParticipants() + { + int numOfParticipants = 0; + + for (SessionConfig sessionConfig : _sessions) + { + numOfParticipants = numOfParticipants + sessionConfig.getTotalNumberOfParticipants(); + } + return numOfParticipants; + } +} diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/controller/config/ConsumerConfig.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/controller/config/ConsumerConfig.java new file mode 100644 index 0000000000..ed47e02667 --- /dev/null +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/controller/config/ConsumerConfig.java @@ -0,0 +1,65 @@ +package org.apache.qpid.disttest.controller.config; + +import org.apache.qpid.disttest.message.CreateConsumerCommand; + +public class ConsumerConfig extends ParticipantConfig +{ + private boolean _isTopic; + private boolean _isDurableSubscription; + private boolean _isBrowsingSubscription; + private String _selector; + private boolean _noLocal; + private boolean _synchronous; + + // For Gson + public ConsumerConfig() + { + _isTopic = false; + _isDurableSubscription = false; + _isBrowsingSubscription = false; + _selector = null; + _noLocal = false; + _synchronous = true; + } + + public ConsumerConfig( + String consumerName, + String destinationName, + long numberOfMessages, + int batchSize, + long maximumDuration, + boolean isTopic, + boolean isDurableSubscription, + boolean isBrowsingSubscription, + String selector, + boolean noLocal, + boolean synchronous) + { + super(consumerName, destinationName, numberOfMessages, batchSize, maximumDuration); + + _isTopic = isTopic; + _isDurableSubscription = isDurableSubscription; + _isBrowsingSubscription = isBrowsingSubscription; + _selector = selector; + _noLocal = noLocal; + _synchronous = synchronous; + } + + public CreateConsumerCommand createCommand(String sessionName) + { + CreateConsumerCommand createConsumerCommand = new CreateConsumerCommand(); + + setParticipantProperties(createConsumerCommand); + + createConsumerCommand.setSessionName(sessionName); + createConsumerCommand.setTopic(_isTopic); + createConsumerCommand.setDurableSubscription(_isDurableSubscription); + createConsumerCommand.setBrowsingSubscription(_isBrowsingSubscription); + createConsumerCommand.setSelector(_selector); + createConsumerCommand.setNoLocal(_noLocal); + createConsumerCommand.setSynchronous(_synchronous); + + return createConsumerCommand; + } + +} diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/controller/config/IterationValue.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/controller/config/IterationValue.java new file mode 100644 index 0000000000..ef953a5d07 --- /dev/null +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/controller/config/IterationValue.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.disttest.controller.config; + +import java.lang.reflect.InvocationTargetException; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import org.apache.commons.beanutils.BeanUtilsBean; +import org.apache.commons.lang.builder.ToStringBuilder; +import org.apache.commons.lang.builder.ToStringStyle; +import org.apache.qpid.disttest.message.Command; + +public class IterationValue +{ + private final Map<String, String> _iterationPropertyValuesWithUnderscores; + + public IterationValue(Map<String, String> iterationMap) + { + _iterationPropertyValuesWithUnderscores = iterationMap; + } + + public IterationValue() + { + _iterationPropertyValuesWithUnderscores = Collections.emptyMap(); + } + + public Map<String, String> getIterationPropertyValuesWithUnderscores() + { + return _iterationPropertyValuesWithUnderscores; + } + + public void applyToCommand(Command command) + { + try + { + Map<String, String> withoutUnderscoresToMatchCommandPropertyNames = getIterationPropertyValuesWithoutUnderscores(); + BeanUtilsBean.getInstance().copyProperties(command, withoutUnderscoresToMatchCommandPropertyNames); + } + catch (IllegalAccessException e) + { + throw new RuntimeException("Couldn't copy properties from iteration " + this + " to " + command, e); + } + catch (InvocationTargetException e) + { + throw new RuntimeException("Couldn't copy properties from iteration " + this + " to " + command, e); + } + } + + private Map<String, String> getIterationPropertyValuesWithoutUnderscores() + { + Map<String, String> iterationPropertyValues = new HashMap<String, String>(); + for (String propertyNameWithUnderscore : _iterationPropertyValuesWithUnderscores.keySet()) + { + String propertyName = propertyNameWithUnderscore.replaceFirst("_", ""); + String propertyValue = _iterationPropertyValuesWithUnderscores.get(propertyNameWithUnderscore); + + iterationPropertyValues.put(propertyName, propertyValue); + } + return iterationPropertyValues; + } + + @Override + public String toString() + { + return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE).append("iterationMap", _iterationPropertyValuesWithUnderscores).toString(); + } + +} diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/controller/config/MessageProviderConfig.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/controller/config/MessageProviderConfig.java new file mode 100644 index 0000000000..318ec7f045 --- /dev/null +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/controller/config/MessageProviderConfig.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.disttest.controller.config; + +import java.util.Map; + +import org.apache.qpid.disttest.client.property.PropertyValue; +import org.apache.qpid.disttest.message.CreateMessageProviderCommand; + +public class MessageProviderConfig +{ + private String _name; + private Map<String, PropertyValue> _messageProperties; + + public MessageProviderConfig() + { + super(); + } + + public MessageProviderConfig(String name, Map<String, PropertyValue> messageProperties) + { + super(); + _name = name; + _messageProperties = messageProperties; + } + + public String getName() + { + return _name; + } + + public Map<String, PropertyValue> getMessageProperties() + { + return _messageProperties; + } + + public CreateMessageProviderCommand createCommand() + { + CreateMessageProviderCommand command = new CreateMessageProviderCommand(); + command.setProviderName(_name); + command.setMessageProperties(_messageProperties); + return command; + } +} diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/controller/config/ParticipantConfig.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/controller/config/ParticipantConfig.java new file mode 100644 index 0000000000..31037a3038 --- /dev/null +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/controller/config/ParticipantConfig.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.disttest.controller.config; + +import org.apache.qpid.disttest.message.CreateParticpantCommand; + +public abstract class ParticipantConfig +{ + private String _destinationName; + private long _numberOfMessages; + private String _name; + private int _batchSize; + private long _maximumDuration; + + // For GSON + public ParticipantConfig() + { + _name = null; + _destinationName = null; + _numberOfMessages = 0; + _batchSize = 0; + _maximumDuration = 0; + } + + public ParticipantConfig( + String name, + String destinationName, + long numberOfMessages, + int batchSize, + long maximumDuration) + { + _name = name; + _destinationName = destinationName; + _numberOfMessages = numberOfMessages; + _batchSize = batchSize; + _maximumDuration = maximumDuration; + } + + protected void setParticipantProperties(CreateParticpantCommand createParticipantCommand) + { + createParticipantCommand.setParticipantName(_name); + createParticipantCommand.setDestinationName(_destinationName); + createParticipantCommand.setNumberOfMessages(_numberOfMessages); + createParticipantCommand.setBatchSize(_batchSize); + createParticipantCommand.setMaximumDuration(_maximumDuration); + } + +}
\ No newline at end of file diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/controller/config/ProducerConfig.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/controller/config/ProducerConfig.java new file mode 100644 index 0000000000..7806528a8c --- /dev/null +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/controller/config/ProducerConfig.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.disttest.controller.config; + +import javax.jms.Message; + +import org.apache.qpid.disttest.message.CreateProducerCommand; + +public class ProducerConfig extends ParticipantConfig +{ + private int _deliveryMode; + private int _messageSize; + private int _priority; + private long _timeToLive; + private long _interval; + private long _startDelay; + private String _messageProviderName; + + // For Gson + public ProducerConfig() + { + _deliveryMode = Message.DEFAULT_DELIVERY_MODE; + _messageSize = 0; + _priority = Message.DEFAULT_PRIORITY; + _timeToLive = Message.DEFAULT_TIME_TO_LIVE; + _interval = 0; + _startDelay = 0; + _messageProviderName = null; + } + + public ProducerConfig( + String producerName, + String destinationName, + long numberOfMessages, + int batchSize, + long maximumDuration, + int deliveryMode, + int messageSize, + int priority, + long timeToLive, + long interval, + long startDelay, + String messageProviderName) + { + super(producerName, destinationName, numberOfMessages, batchSize, maximumDuration); + + _deliveryMode = deliveryMode; + _messageSize = messageSize; + _priority = priority; + _timeToLive = timeToLive; + _interval = interval; + _startDelay = startDelay; + _messageProviderName = messageProviderName; + } + + public CreateProducerCommand createCommand(String sessionName) + { + CreateProducerCommand command = new CreateProducerCommand(); + + setParticipantProperties(command); + + command.setSessionName(sessionName); + command.setDeliveryMode(_deliveryMode); + command.setMessageSize(_messageSize); + command.setPriority(_priority); + command.setTimeToLive(_timeToLive); + command.setInterval(_interval); + command.setStartDelay(_startDelay); + command.setMessageProviderName(_messageProviderName); + + return command; + } +} diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/controller/config/QueueConfig.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/controller/config/QueueConfig.java new file mode 100644 index 0000000000..cffc2b7c50 --- /dev/null +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/controller/config/QueueConfig.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.disttest.controller.config; + +import java.util.Collections; +import java.util.Map; + +import org.apache.commons.lang.builder.ReflectionToStringBuilder; +import org.apache.commons.lang.builder.ToStringStyle; + +public class QueueConfig +{ + private String _name; + private boolean _durable; + private Map<String, Object> _attributes; + + public QueueConfig() + { + super(); + _attributes = Collections.emptyMap(); + } + + public QueueConfig(String name, boolean durable, Map<String, Object> attributes) + { + super(); + this._name = name; + this._durable = durable; + this._attributes = attributes; + } + + public String getName() + { + return _name; + } + + // TODO x-qpid-capacity and x-qpid-flow-resume-capacity need to be typed as numeric but we currrently + // pass these as a string. + public Map<String, Object> getAttributes() + { + return _attributes; + } + + public boolean isDurable() + { + return _durable; + } + + @Override + public String toString() + { + return ReflectionToStringBuilder.toString(this, ToStringStyle.SHORT_PREFIX_STYLE); + } +} diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/controller/config/SessionConfig.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/controller/config/SessionConfig.java new file mode 100644 index 0000000000..12372e5391 --- /dev/null +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/controller/config/SessionConfig.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.disttest.controller.config; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import javax.jms.Session; + +import org.apache.qpid.disttest.message.Command; +import org.apache.qpid.disttest.message.CreateSessionCommand; + +public class SessionConfig +{ + private static final List<ProducerConfig> EMPTY_PRODUCER_LIST = Collections.emptyList(); + private static final List<ConsumerConfig> EMPTY_CONSUMER_LIST = Collections.emptyList(); + + private int _acknowledgeMode; + private String _sessionName; + private List<ProducerConfig> _producers; + private List<ConsumerConfig> _consumers; + + // For Gson + public SessionConfig() + { + this(null, Session.SESSION_TRANSACTED, EMPTY_CONSUMER_LIST, EMPTY_PRODUCER_LIST); + } + + public SessionConfig(String sessionName, int acknowledgeMode, ProducerConfig...producers) + { + this(sessionName, acknowledgeMode, EMPTY_CONSUMER_LIST, Arrays.asList(producers)); + } + + public SessionConfig(String sessionName, int acknowledgeMode, ConsumerConfig... consumers) + { + this(sessionName, acknowledgeMode, Arrays.asList(consumers), EMPTY_PRODUCER_LIST); + } + + public SessionConfig(String sessionName, int acknowledgeMode, List<ConsumerConfig> consumers, List<ProducerConfig> producers) + { + _sessionName = sessionName; + _acknowledgeMode = acknowledgeMode; + _consumers = consumers; + _producers = producers; + } + + public int getAcknowledgeMode() + { + return _acknowledgeMode; + } + + public String getSessionName() + { + return _sessionName; + } + + public List<ProducerConfig> getProducers() + { + return Collections.unmodifiableList(_producers); + } + + public List<ConsumerConfig> getConsumers() + { + return Collections.unmodifiableList(_consumers); + } + + public List<Command> createCommands(String connectionName) + { + List<Command> commands = new ArrayList<Command>(); + commands.add(createCommand(connectionName)); + for (ProducerConfig producer : _producers) + { + commands.add(producer.createCommand(_sessionName)); + } + for (ConsumerConfig consumer : _consumers) + { + commands.add(consumer.createCommand(_sessionName)); + } + return commands; + } + + private CreateSessionCommand createCommand(String connectionName) + { + CreateSessionCommand command = new CreateSessionCommand(); + command.setAcknowledgeMode(_acknowledgeMode); + command.setConnectionName(connectionName); + command.setSessionName(_sessionName); + return command; + } + + public int getTotalNumberOfParticipants() + { + return _producers.size() + _consumers.size(); + } +} diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/controller/config/TestConfig.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/controller/config/TestConfig.java new file mode 100644 index 0000000000..2bb5f1b289 --- /dev/null +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/controller/config/TestConfig.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.disttest.controller.config; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import org.apache.qpid.disttest.controller.CommandForClient; + +public class TestConfig +{ + private final String _name; + + private final List<ClientConfig> _clients; + + private final List<QueueConfig> _queues; + + private final List<Map<String, String>> _iterations; + + public TestConfig() + { + _clients = Collections.emptyList(); + _queues = Collections.emptyList(); + _name = null; + _iterations = Collections.emptyList(); + } + + public TestConfig(String name, ClientConfig[] clients, QueueConfig[] queues) + { + _clients = Arrays.asList(clients); + _queues = Arrays.asList(queues); + _name = name; + _iterations = Collections.emptyList(); + } + + public List<String> getClientNames() + { + List<String> clientNames = new ArrayList<String>(); + for (ClientConfig clientConfig : _clients) + { + clientNames.add(clientConfig.getName()); + } + return clientNames; + } + + public int getTotalNumberOfClients() + { + return _clients.size(); + } + + public int getTotalNumberOfParticipants() + { + int numOfParticipants = 0; + for (ClientConfig client : _clients) + { + numOfParticipants = numOfParticipants + client.getTotalNumberOfParticipants(); + } + return numOfParticipants; + } + + public List<CommandForClient> createCommands() + { + List<CommandForClient> commandsForClients = new ArrayList<CommandForClient>(); + for (ClientConfig client : _clients) + { + commandsForClients.addAll(client.createCommands()); + } + + return Collections.unmodifiableList(commandsForClients); + } + + public List<QueueConfig> getQueues() + { + return Collections.unmodifiableList(_queues); + } + + public String getName() + { + return _name; + } + + public List<IterationValue> getIterationValues() + { + List<IterationValue> iterationValues = new ArrayList<IterationValue>(); + for (Map<String, String> iterationMap : _iterations) + { + iterationValues.add(new IterationValue(iterationMap)); + } + + return iterationValues; + } + + public List<ClientConfig> getClients() + { + return Collections.unmodifiableList(_clients); + } +} diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/controller/config/TestInstance.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/controller/config/TestInstance.java new file mode 100644 index 0000000000..9f555ef4da --- /dev/null +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/controller/config/TestInstance.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.disttest.controller.config; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.lang.builder.ToStringBuilder; +import org.apache.commons.lang.builder.ToStringStyle; +import org.apache.qpid.disttest.controller.CommandForClient; +import org.apache.qpid.disttest.message.Command; + +public class TestInstance +{ + private static final IterationValue EMPTY_ITERATION_VALUES = new IterationValue(); + + private TestConfig _testConfig; + private IterationValue _iterationValue; + private int _iterationNumber; + + public TestInstance(TestConfig testConfig, int iterationNumber, IterationValue iterationValue) + { + _testConfig = testConfig; + _iterationNumber = iterationNumber; + _iterationValue = iterationValue; + } + + public TestInstance(TestConfig testConfig) + { + this(testConfig, 0, EMPTY_ITERATION_VALUES); + } + + public List<CommandForClient> createCommands() + { + List<CommandForClient> commands = _testConfig.createCommands(); + List<CommandForClient> newCommands = new ArrayList<CommandForClient>(commands.size()); + + for (CommandForClient commandForClient : commands) + { + String clientName = commandForClient.getClientName(); + Command command = commandForClient.getCommand(); + + _iterationValue.applyToCommand(command); + + newCommands.add(new CommandForClient(clientName, command)); + } + + return newCommands; + + } + + public String getName() + { + return _testConfig.getName(); + } + + public int getIterationNumber() + { + return _iterationNumber; + } + + public int getTotalNumberOfParticipants() + { + return _testConfig.getTotalNumberOfParticipants(); + } + + public List<QueueConfig> getQueues() + { + return _testConfig.getQueues(); + } + + public List<String> getClientNames() + { + return _testConfig.getClientNames(); + } + + @Override + public String toString() + { + return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE) + .append("testName", getName()) + .append("iterationNumber", _iterationNumber) + .toString(); + } + +} diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/ClientJmsDelegate.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/ClientJmsDelegate.java new file mode 100644 index 0000000000..a3c2c57473 --- /dev/null +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/ClientJmsDelegate.java @@ -0,0 +1,575 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.disttest.jms; + +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.DeliveryMode; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.TextMessage; +import javax.naming.Context; +import javax.naming.NamingException; + +import org.apache.commons.lang.builder.ToStringBuilder; +import org.apache.commons.lang.builder.ToStringStyle; +import org.apache.qpid.disttest.DistributedTestConstants; +import org.apache.qpid.disttest.DistributedTestException; +import org.apache.qpid.disttest.client.Client; +import org.apache.qpid.disttest.client.MessageProvider; +import org.apache.qpid.disttest.message.Command; +import org.apache.qpid.disttest.message.CreateConnectionCommand; +import org.apache.qpid.disttest.message.CreateConsumerCommand; +import org.apache.qpid.disttest.message.CreateMessageProviderCommand; +import org.apache.qpid.disttest.message.CreateProducerCommand; +import org.apache.qpid.disttest.message.CreateSessionCommand; +import org.apache.qpid.disttest.message.RegisterClientCommand; +import org.apache.qpid.disttest.message.Response; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ClientJmsDelegate +{ + private static final Logger LOGGER = LoggerFactory.getLogger(ClientJmsDelegate.class); + + private final Context _context; + private final Destination _controllerQueue; + private final Connection _controllerConnection; + private final Session _controllerSession; + private final MessageProducer _controlQueueProducer; + + private final String _clientName; + private Queue _instructionQueue; + + private Map<String, Connection> _testConnections; + private Map<String, Session> _testSessions; + private Map<String, MessageProducer> _testProducers; + private Map<String, MessageConsumer> _testConsumers; + private Map<String, MessageProvider> _testMessageProviders; + + private final MessageProvider _defaultMessageProvider; + + public ClientJmsDelegate(final Context context) + { + try + { + _context = context; + final ConnectionFactory connectionFactory = (ConnectionFactory) _context.lookup("connectionfactory"); + _controllerConnection = connectionFactory.createConnection(); + _controllerConnection.start(); + _controllerQueue = (Destination) context.lookup(DistributedTestConstants.CONTROLLER_QUEUE_JNDI_NAME); + _controllerSession = _controllerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + _controlQueueProducer = _controllerSession.createProducer(_controllerQueue); + _clientName = UUID.randomUUID().toString(); + _testConnections = new HashMap<String, Connection>(); + _testSessions = new HashMap<String, Session>(); + _testProducers = new HashMap<String, MessageProducer>(); + _testConsumers = new HashMap<String, MessageConsumer>(); + _testMessageProviders = new HashMap<String, MessageProvider>(); + _defaultMessageProvider = new MessageProvider(null); + } + catch (final NamingException ne) + { + throw new DistributedTestException("Unable to create client jms delegate", ne); + } + catch (final JMSException jmse) + { + throw new DistributedTestException("Unable to create client jms delegate", jmse); + } + } + + public void setInstructionListener(final Client client) + { + try + { + _instructionQueue = _controllerSession.createTemporaryQueue(); + final MessageConsumer instructionConsumer = _controllerSession.createConsumer(_instructionQueue); + instructionConsumer.setMessageListener(new MessageListener() + { + @Override + public void onMessage(final Message message) + { + client.processInstruction(JmsMessageAdaptor.messageToCommand(message)); + } + }); + } + catch (final JMSException jmse) + { + throw new DistributedTestException("Unable to setup instruction listener", jmse); + } + } + + public void sendRegistrationMessage() + { + Command command; + try + { + command = new RegisterClientCommand(_clientName, _instructionQueue.getQueueName()); + } + catch (final JMSException e) + { + throw new DistributedTestException(e); + } + sendCommand(command); + } + + public void sendResponseMessage(final Response responseMessage) + { + sendCommand(responseMessage); + } + + private void sendCommand(final Command command) + { + try + { + final Message message = JmsMessageAdaptor.commandToMessage(_controllerSession, command); + _controlQueueProducer.send(message); + LOGGER.debug("Sent message for " + command.getType() + ". message id: " + message.getJMSMessageID()); + } + catch (final JMSException jmse) + { + throw new DistributedTestException("Unable to send command: " + command, jmse); + } + } + + public void createConnection(final CreateConnectionCommand command) + { + try + { + final ConnectionFactory connectionFactory = (ConnectionFactory) _context.lookup(command + .getConnectionFactoryName()); + final Connection newConnection = connectionFactory.createConnection(); + addConnection(command.getConnectionName(), newConnection); + } + catch (final NamingException ne) + { + throw new DistributedTestException("Unable to lookup factoryName: " + command.getConnectionFactoryName(), + ne); + } + catch (final JMSException jmse) + { + throw new DistributedTestException("Unable to create connection: " + command.getConnectionName() + + " (using factory name: " + command.getConnectionFactoryName() + ")", jmse); + } + } + + public void createSession(final CreateSessionCommand command) + { + try + { + final Connection connection = _testConnections.get(command.getConnectionName()); + if (connection == null) + { + throw new DistributedTestException("No test connection found called: " + command.getConnectionName(), + command); + } + final boolean transacted = command.getAcknowledgeMode() == Session.SESSION_TRANSACTED; + final Session newSession = connection.createSession(transacted, command.getAcknowledgeMode()); + addSession(command.getSessionName(), newSession); + } + catch (final JMSException jmse) + { + throw new DistributedTestException("Unable to create new session: " + command, jmse); + } + } + + public void createProducer(final CreateProducerCommand command) + { + try + { + final Session session = _testSessions.get(command.getSessionName()); + if (session == null) + { + throw new DistributedTestException("No test session found called: " + command.getSessionName(), command); + } + final Destination destination = session.createQueue(command.getDestinationName()); + final MessageProducer jmsProducer = session.createProducer(destination); + if (command.getPriority() != -1) + { + jmsProducer.setPriority(command.getPriority()); + } + if (command.getTimeToLive() > 0) + { + jmsProducer.setTimeToLive(command.getTimeToLive()); + } + + if (command.getDeliveryMode() == DeliveryMode.NON_PERSISTENT + || command.getDeliveryMode() == DeliveryMode.PERSISTENT) + { + jmsProducer.setDeliveryMode(command.getDeliveryMode()); + } + + addProducer(command.getParticipantName(), jmsProducer); + } + catch (final JMSException jmse) + { + throw new DistributedTestException("Unable to create new producer: " + command, jmse); + } + + } + + public void createConsumer(final CreateConsumerCommand command) + { + try + { + final Session session = _testSessions.get(command.getSessionName()); + if (session == null) + { + throw new DistributedTestException("No test session found called: " + command.getSessionName(), command); + } + final Destination destination = command.isTopic() ? session.createTopic(command.getDestinationName()) + : session.createQueue(command.getDestinationName()); + final MessageConsumer jmsConsumer = session.createConsumer(destination, command.getSelector()); + + _testConsumers.put(command.getParticipantName(), jmsConsumer); + } + catch (final JMSException jmse) + { + throw new DistributedTestException("Unable to create new consumer: " + command, jmse); + } + } + + /** + * destroy the client. Don't call from the Dispatcher thread. + */ + public void destroy() + { + try + { + // Stopping the connection allows in-flight onMessage calls to + // finish. + _controllerConnection.stop(); + + if (_controllerSession != null) + { + _controllerSession.close(); + } + if (_controllerConnection != null) + { + _controllerConnection.close(); + } + + } + catch (final JMSException jmse) + { + throw new DistributedTestException("Unable to destroy cleanly", jmse); + } + } + + public Destination getControllerQueue() + { + return _controllerQueue; + } + + public String getClientName() + { + return _clientName; + } + + public int getNoOfTestConnections() + { + return _testConnections.size(); + } + + public int getNoOfTestSessions() + { + return _testSessions.size(); + } + + public int getNoOfTestProducers() + { + return _testProducers.size(); + } + + public int getNoOfTestConsumers() + { + return _testConsumers.size(); + } + + public void startConnections() + { + // start connections for consumers + // it would be better if we could track consumer connections and start + // only those + if (!_testConsumers.isEmpty()) + { + for (final Map.Entry<String, Connection> entry : _testConnections.entrySet()) + { + final Connection connection = entry.getValue(); + try + { + connection.start(); + } + catch (final JMSException e) + { + throw new DistributedTestException("Failed to start connection '" + entry.getKey() + "' :" + + e.getLocalizedMessage()); + } + } + } + } + + public void commitOrAcknowledgeMessage(final Message message, final String sessionName) + { + try + { + final Session session = _testSessions.get(sessionName); + if (session.getTransacted()) + { + session.commit(); + } + else if (message != null && session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE) + { + message.acknowledge(); + } + } + catch (final JMSException jmse) + { + throw new DistributedTestException("Unable to commit or acknowledge message on session: " + + sessionName, jmse); + } + } + + public Message sendNextMessage(final CreateProducerCommand command) + { + Message sentMessage = null; + MessageProvider messageProvider = _testMessageProviders.get(command.getMessageProviderName()); + if (messageProvider == null) + { + messageProvider = _defaultMessageProvider; + } + + final Session session = _testSessions.get(command.getSessionName()); + final MessageProducer producer = _testProducers.get(command.getParticipantName()); + try + { + sentMessage = messageProvider.nextMessage(session, command); + int deliveryMode = producer.getDeliveryMode(); + int priority = producer.getPriority(); + long ttl = producer.getTimeToLive(); + if (messageProvider.isPropertySet(MessageProvider.PRIORITY)) + { + priority = sentMessage.getJMSPriority(); + } + if (messageProvider.isPropertySet(MessageProvider.DELIVERY_MODE)) + { + deliveryMode = sentMessage.getJMSDeliveryMode(); + } + if (messageProvider.isPropertySet(MessageProvider.TTL)) + { + ttl = sentMessage.getLongProperty(MessageProvider.TTL); + } + producer.send(sentMessage, deliveryMode, priority, ttl); + } + catch (final JMSException jmse) + { + throw new DistributedTestException("Unable to create and send message with producer: " + + command.getParticipantName() + " on session: " + command.getSessionName(), jmse); + } + return sentMessage; + } + + protected void addSession(final String sessionName, final Session newSession) + { + _testSessions.put(sessionName, newSession); + } + + protected void addConnection(final String connectionName, final Connection newConnection) + { + _testConnections.put(connectionName, newConnection); + } + + protected void addProducer(final String producerName, final MessageProducer jmsProducer) + { + _testProducers.put(producerName, jmsProducer); + } + + public Message consumeMessage(String consumerName, long receiveInterval) + { + Message consumedMessage = null; + MessageConsumer consumer = _testConsumers.get(consumerName); + try + { + consumedMessage = consumer.receive(receiveInterval); + } + catch (JMSException e) + { + throw new DistributedTestException("Unable to consume message with consumer: " + consumerName, e); + } + return consumedMessage; + } + + public void registerListener(String consumerName, MessageListener messageListener) + { + MessageConsumer consumer = _testConsumers.get(consumerName); + try + { + consumer.setMessageListener(messageListener); + } + catch (JMSException e) + { + throw new DistributedTestException("Unable to register message listener with consumer: " + consumerName, e); + } + } + + public void rollbackOrRecover(String sessionName) + { + try + { + final Session session = _testSessions.get(sessionName); + if (session.getTransacted()) + { + session.rollback(); + } + else if (session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE) + { + session.recover(); + } + } + catch (final JMSException jmse) + { + throw new DistributedTestException("Unable to rollback or recover on session: " + + sessionName, jmse); + } + } + + public void releaseMessage(String sessionName) + { + try + { + final Session session = _testSessions.get(sessionName); + if (session.getTransacted()) + { + session.rollback(); + } + else + { + session.recover(); + } + } + catch (final JMSException jmse) + { + LOGGER.warn("Unable to rollback or recover on session: " + sessionName, jmse); + } + } + + @Override + public String toString() + { + return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE).append("clientName", _clientName).toString(); + } + + public void closeTestConnections() + { + StringBuilder jmsErrorMessages = new StringBuilder(); + int failedCloseCounter = 0; + for (final Map.Entry<String, Connection> entry : _testConnections.entrySet()) + { + final Connection connection = entry.getValue(); + try + { + connection.close(); + } + catch (final JMSException e) + { + LOGGER.error("Failed to close connection '" + entry.getKey() + "' :" + e.getLocalizedMessage(), e); + failedCloseCounter++; + if (jmsErrorMessages.length() > 0) + { + jmsErrorMessages.append('\n'); + } + jmsErrorMessages.append(e.getMessage()); + } + } + _testConnections.clear(); + _testSessions.clear(); + _testProducers.clear(); + _testConsumers.clear(); + if (failedCloseCounter > 0) + { + throw new DistributedTestException("Close failed for " + failedCloseCounter + " connection(s) with the following errors: " + jmsErrorMessages.toString()); + } + } + + public void closeTestConsumer(String consumerName) + { + MessageConsumer consumer = _testConsumers.get(consumerName); + if (consumer != null) + { + try + { + consumer.close(); + LOGGER.info("Closed test consumer " + consumerName); + } + catch (JMSException e) + { + throw new DistributedTestException("Failed to close consumer: " + consumerName, e); + } + } + } + + public void closeTestProducer(String producerName) + { + MessageProducer producer = _testProducers.get(producerName); + if (producer != null) + { + try + { + producer.close(); + } + catch (JMSException e) + { + throw new DistributedTestException("Failed to close producer: " + producerName, e); + } + } + } + + public int calculatePayloadSizeFrom(Message message) + { + try + { + if (message != null && message instanceof TextMessage) + { + return ((TextMessage) message).getText().getBytes().length; + } + // TODO support other message types + return 0; + } + catch (JMSException e) + { + throw new DistributedTestException("Unable to determine the payload size for message " + message, e); + } + } + + public void createMessageProvider(CreateMessageProviderCommand command) + { + _testMessageProviders.put(command.getProviderName(), new MessageProvider(command.getMessageProperties())); + } +} diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/ControllerJmsDelegate.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/ControllerJmsDelegate.java new file mode 100644 index 0000000000..3bf8e6ff4a --- /dev/null +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/ControllerJmsDelegate.java @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.disttest.jms; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.naming.Context; +import javax.naming.NamingException; + +import org.apache.qpid.disttest.DistributedTestException; +import org.apache.qpid.disttest.controller.CommandListener; +import org.apache.qpid.disttest.controller.config.QueueConfig; +import org.apache.qpid.disttest.message.Command; +import org.apache.qpid.disttest.message.RegisterClientCommand; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ControllerJmsDelegate +{ + private static final Logger LOGGER = LoggerFactory.getLogger(ControllerJmsDelegate.class); + + private final Map<String, Destination> _clientNameToQueueMap = new ConcurrentHashMap<String, Destination>(); + private final Connection _connection; + private final Destination _controllerQueue; + private final Session _session; + private final QueueCreator _queueCreator; + + private List<CommandListener> _commandListeners = new CopyOnWriteArrayList<CommandListener>(); + + public ControllerJmsDelegate(final Context context) throws NamingException, JMSException + { + final ConnectionFactory connectionFactory = (ConnectionFactory) context.lookup("connectionfactory"); + _connection = connectionFactory.createConnection(); + _connection.start(); + _controllerQueue = (Destination) context.lookup("controllerqueue"); + _session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + _queueCreator = new QpidQueueCreator(); + } + + public void start() + { + try + { + final MessageConsumer consumer = _session.createConsumer(_controllerQueue); + consumer.setMessageListener(new MessageListener() + { + @Override + public void onMessage(final Message message) + { + try + { + String jmsMessageID = message.getJMSMessageID(); + LOGGER.debug("Received message " + jmsMessageID); + + final Command command = JmsMessageAdaptor.messageToCommand(message); + LOGGER.debug("Converted message " + jmsMessageID + " into command: " + command); + + processCommandWithFirstSupportingListener(command); + LOGGER.debug("Finished processing command for message " + jmsMessageID); + } + catch (Throwable t) + { + LOGGER.error("Can't handle JMS message", t); + } + } + }); + } + catch (final JMSException e) + { + throw new DistributedTestException(e); + } + } + + /** ensures connections are closed, otherwise the JVM may be prevented from terminating */ + public void closeConnections() + { + try + { + _session.close(); + } + catch (JMSException e) + { + LOGGER.error("Unable to close session", e); + } + + try + { + _connection.stop(); + } + catch (JMSException e) + { + LOGGER.error("Unable to stop connection", e); + } + + try + { + _connection.close(); + } + catch (JMSException e) + { + throw new DistributedTestException("Unable to close connection", e); + } + } + + public void registerClient(final RegisterClientCommand command) + { + final String clientName = command.getClientName(); + final Destination clientIntructionQueue = createDestinationFromString(command.getClientQueueName()); + _clientNameToQueueMap.put(clientName, clientIntructionQueue); + } + + public void sendCommandToClient(final String clientName, final Command command) + { + final Destination clientQueue = _clientNameToQueueMap.get(clientName); + if (clientQueue == null) + { + throw new DistributedTestException("Client name " + clientName + " not known. I know about: " + + _clientNameToQueueMap.keySet()); + } + + try + { + final MessageProducer producer = _session.createProducer(clientQueue); + final Message message = JmsMessageAdaptor.commandToMessage(_session, command); + + producer.send(message); + } + catch (final JMSException e) + { + throw new DistributedTestException(e); + } + } + + private void processCommandWithFirstSupportingListener(Command command) + { + for (CommandListener listener : _commandListeners) + { + if (listener.supports(command)) + { + listener.processCommand(command); + return; + } + } + + throw new IllegalStateException("There is no registered listener to process command " + command); + } + + private Destination createDestinationFromString(final String clientQueueName) + { + Destination clientIntructionQueue; + try + { + clientIntructionQueue = _session.createQueue(clientQueueName); + } + catch (JMSException e) + { + throw new DistributedTestException("Unable to create Destination from " + clientQueueName); + } + return clientIntructionQueue; + } + + public void createQueues(List<QueueConfig> queues) + { + _queueCreator.createQueues(_connection, queues); + } + + public void deleteQueues(List<QueueConfig> queues) + { + _queueCreator.deleteQueues(_connection, queues); + } + + public void addCommandListener(CommandListener commandListener) + { + _commandListeners.add(commandListener); + } + + public void removeCommandListener(CommandListener commandListener) + { + _commandListeners.remove(commandListener); + } +} diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/JmsMessageAdaptor.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/JmsMessageAdaptor.java new file mode 100644 index 0000000000..c9dba21a74 --- /dev/null +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/JmsMessageAdaptor.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.disttest.jms; + +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.Session; + +import org.apache.qpid.disttest.DistributedTestConstants; +import org.apache.qpid.disttest.DistributedTestException; +import org.apache.qpid.disttest.json.JsonHandler; +import org.apache.qpid.disttest.message.Command; +import org.apache.qpid.disttest.message.CommandType; +import org.apache.qpid.disttest.message.ConsumerParticipantResult; +import org.apache.qpid.disttest.message.CreateConnectionCommand; +import org.apache.qpid.disttest.message.CreateConsumerCommand; +import org.apache.qpid.disttest.message.CreateMessageProviderCommand; +import org.apache.qpid.disttest.message.CreateProducerCommand; +import org.apache.qpid.disttest.message.CreateResponderCommand; +import org.apache.qpid.disttest.message.CreateSessionCommand; +import org.apache.qpid.disttest.message.NoOpCommand; +import org.apache.qpid.disttest.message.ParticipantResult; +import org.apache.qpid.disttest.message.ProducerParticipantResult; +import org.apache.qpid.disttest.message.RegisterClientCommand; +import org.apache.qpid.disttest.message.Response; +import org.apache.qpid.disttest.message.StartTestCommand; +import org.apache.qpid.disttest.message.StopClientCommand; +import org.apache.qpid.disttest.message.TearDownTestCommand; + +public class JmsMessageAdaptor +{ + public static Message commandToMessage(final Session session, final Command command) + { + Message jmsMessage = null; + try + { + jmsMessage = session.createMessage(); + jmsMessage.setStringProperty(DistributedTestConstants.MSG_COMMAND_PROPERTY, command.getType().name()); + final JsonHandler jsonHandler = new JsonHandler(); + jmsMessage.setStringProperty(DistributedTestConstants.MSG_JSON_PROPERTY, jsonHandler.marshall(command)); + } + catch (final JMSException jmse) + { + throw new DistributedTestException("Unable to convert command " + command + " to JMS Message", jmse); + } + + return jmsMessage; + } + + public static Command messageToCommand(final Message jmsMessage) + { + Command command = null; + try + { + final CommandType commandType = CommandType.valueOf(jmsMessage + .getStringProperty(DistributedTestConstants.MSG_COMMAND_PROPERTY)); + final JsonHandler jsonHandler = new JsonHandler(); + command = jsonHandler.unmarshall(jmsMessage.getStringProperty(DistributedTestConstants.MSG_JSON_PROPERTY), + getCommandClassFromType(commandType)); + } + catch (final JMSException jmse) + { + throw new DistributedTestException("Unable to convert JMS message " + jmsMessage + " to command object", + jmse); + } + return command; + } + + static Class<? extends Command> getCommandClassFromType(final CommandType type) + { + switch (type) + { + case CREATE_CONNECTION: + return CreateConnectionCommand.class; + case CREATE_SESSION: + return CreateSessionCommand.class; + case CREATE_PRODUCER: + return CreateProducerCommand.class; + case CREATE_CONSUMER: + return CreateConsumerCommand.class; + case CREATE_RESPONDER: + return CreateResponderCommand.class; + case NO_OP: + return NoOpCommand.class; + case REGISTER_CLIENT: + return RegisterClientCommand.class; + case STOP_CLIENT: + return StopClientCommand.class; + case RESPONSE: + return Response.class; + case START_TEST: + return StartTestCommand.class; + case TEAR_DOWN_TEST: + return TearDownTestCommand.class; + case PARTICIPANT_RESULT: + return ParticipantResult.class; + case CONSUMER_PARTICIPANT_RESULT: + return ConsumerParticipantResult.class; + case PRODUCER_PARTICIPANT_RESULT: + return ProducerParticipantResult.class; + case CREATE_MESSAGE_PROVIDER: + return CreateMessageProviderCommand.class; + default: + throw new DistributedTestException("No class defined for type: " + type); + } + } +} diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/QpidQueueCreator.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/QpidQueueCreator.java new file mode 100644 index 0000000000..6b21181b2a --- /dev/null +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/QpidQueueCreator.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.disttest.jms; + +import java.util.List; + +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.Session; + +import org.apache.qpid.client.AMQDestination; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.disttest.DistributedTestException; +import org.apache.qpid.disttest.controller.config.QueueConfig; +import org.apache.qpid.framing.FieldTable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class QpidQueueCreator implements QueueCreator +{ + private static final Logger LOGGER = LoggerFactory.getLogger(QpidQueueCreator.class); + + private static final FieldTable EMPTY_QUEUE_BIND_ARGUMENTS = new FieldTable(); + + @Override + public void createQueues(Connection connection, List<QueueConfig> configs) + { + AMQSession<?, ?> session = createSession(connection); + for (QueueConfig queueConfig : configs) + { + createQueue(session, queueConfig); + } + } + + @Override + public void deleteQueues(Connection connection, List<QueueConfig> configs) + { + AMQSession<?, ?> session = createSession(connection); + for (QueueConfig queueConfig : configs) + { + deleteQueue(session, queueConfig); + } + } + + private void createQueue(AMQSession<?, ?> session, QueueConfig queueConfig) + { + try + { + AMQDestination destination = (AMQDestination) session.createQueue(queueConfig.getName()); + boolean autoDelete = false; + boolean exclusive = false; + session.createQueue(destination.getAMQQueueName(), autoDelete, + queueConfig.isDurable(), exclusive, queueConfig.getAttributes()); + session.bindQueue(destination.getAMQQueueName(), destination.getRoutingKey(), + EMPTY_QUEUE_BIND_ARGUMENTS, destination.getExchangeName(), + destination, autoDelete); + + LOGGER.info("Created queue " + queueConfig); + } + catch (Exception e) + { + throw new DistributedTestException("Failed to create queue:" + queueConfig, e); + } + } + + private void deleteQueue(AMQSession<?, ?> session, QueueConfig queueConfig) + { + try + { + // The Qpid AMQSession API currently makes the #deleteQueue method protected and the + // raw protocol method public. This should be changed then we should switch the below to + // use #deleteQueue. + AMQDestination destination = (AMQDestination) session.createQueue(queueConfig.getName()); + session.sendQueueDelete(destination.getAMQQueueName()); + LOGGER.info("Deleted queue " + queueConfig.getName()); + } + catch (Exception e) + { + throw new DistributedTestException("Failed to delete queue:" + queueConfig.getName(), e); + } + } + + private AMQSession<?, ?> createSession(Connection connection) + { + try + { + return (AMQSession<?, ?>) connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + } + catch (JMSException e) + { + throw new DistributedTestException("Failed to create session!", e); + } + } +} diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/QueueCreator.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/QueueCreator.java new file mode 100644 index 0000000000..dd9faef39f --- /dev/null +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/QueueCreator.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.disttest.jms; + +import java.util.List; + +import javax.jms.Connection; + +import org.apache.qpid.disttest.controller.config.QueueConfig; + +public interface QueueCreator +{ + public void createQueues(final Connection connection, final List<QueueConfig> configs); + public void deleteQueues(final Connection connection, final List<QueueConfig> configs); +} diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/json/JsonHandler.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/json/JsonHandler.java new file mode 100644 index 0000000000..8e50cd4f11 --- /dev/null +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/json/JsonHandler.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.disttest.json; + +import org.apache.qpid.disttest.client.property.PropertyValue; +import org.apache.qpid.disttest.message.Command; + +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; + +public class JsonHandler +{ + private final Gson _gson = new GsonBuilder() + .registerTypeAdapter(PropertyValue.class, new PropertyValueAdapter()) + .create(); + + public <T extends Command> T unmarshall(final String jsonParams, final Class<T> clazz) + { + return _gson.fromJson(jsonParams, clazz); + } + + public <T extends Command> String marshall(final T command) + { + return _gson.toJson(command); + } +} diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/json/PropertyValueAdapter.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/json/PropertyValueAdapter.java new file mode 100644 index 0000000000..94f712e652 --- /dev/null +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/json/PropertyValueAdapter.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.disttest.json; + +import java.lang.reflect.Type; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.qpid.disttest.client.property.GeneratedPropertyValue; +import org.apache.qpid.disttest.client.property.PropertyValue; +import org.apache.qpid.disttest.client.property.PropertyValueFactory; +import org.apache.qpid.disttest.client.property.SimplePropertyValue; + +import com.google.gson.JsonArray; +import com.google.gson.JsonDeserializationContext; +import com.google.gson.JsonDeserializer; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import com.google.gson.JsonParseException; +import com.google.gson.JsonPrimitive; +import com.google.gson.JsonSerializationContext; +import com.google.gson.JsonSerializer; + +public class PropertyValueAdapter implements JsonDeserializer<PropertyValue>, JsonSerializer<PropertyValue> +{ + private static final String DEF_FIELD = "@def"; + private PropertyValueFactory _factory = new PropertyValueFactory(); + + @Override + public PropertyValue deserialize(JsonElement json, Type type, JsonDeserializationContext context) + throws JsonParseException + { + if (json.isJsonNull()) + { + return null; + } + else if (json.isJsonPrimitive()) + { + Object result = null; + JsonPrimitive primitive = json.getAsJsonPrimitive(); + if (primitive.isString()) + { + result = primitive.getAsString(); + } + else if (primitive.isNumber()) + { + String asString = primitive.getAsString(); + if (asString.indexOf('.') != -1 || asString.indexOf('e') != -1) + { + result = primitive.getAsDouble(); + } + else + { + result = primitive.getAsLong(); + } + } + else if (primitive.isBoolean()) + { + result = primitive.getAsBoolean(); + } + else + { + throw new JsonParseException("Unsupported primitive value " + primitive); + } + return new SimplePropertyValue(result); + } + else if (json.isJsonArray()) + { + JsonArray array = json.getAsJsonArray(); + List<Object> result = new ArrayList<Object>(array.size()); + for (JsonElement element : array) + { + result.add(context.deserialize(element, Object.class)); + } + return new SimplePropertyValue(result); + } + else if (json.isJsonObject()) + { + JsonObject object = json.getAsJsonObject(); + JsonElement defElement = object.getAsJsonPrimitive(DEF_FIELD); + Class<?> classInstance = null; + if (defElement != null) + { + try + { + classInstance = _factory.getPropertyValueClass(defElement.getAsString()); + } + catch (ClassNotFoundException e) + { + // ignore + } + } + if (classInstance == null) + { + Map<String, Object> result = new HashMap<String, Object>(); + for (Map.Entry<String, JsonElement> entry : object.entrySet()) + { + Object value = context.deserialize(entry.getValue(), Object.class); + result.put(entry.getKey(), value); + } + return new SimplePropertyValue(result); + } + else + { + return context.deserialize(json, classInstance); + } + } + else + { + throw new JsonParseException("Unsupported JSON type " + json); + } + } + + @Override + public JsonElement serialize(PropertyValue src, Type typeOfSrc, JsonSerializationContext context) + { + if (src instanceof GeneratedPropertyValue) + { + JsonObject object = (JsonObject) context.serialize(src, Object.class); + object.addProperty(DEF_FIELD, ((GeneratedPropertyValue) src).getDefinition()); + return object; + } + else + { + return context.serialize(src.getValue(), Object.class); + } + } + +} diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/message/Command.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/message/Command.java new file mode 100644 index 0000000000..86b4d0e439 --- /dev/null +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/message/Command.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.disttest.message; + + +import org.apache.commons.lang.builder.ReflectionToStringBuilder; +import org.apache.commons.lang.builder.ToStringStyle; +import org.apache.qpid.disttest.Visitor; +import org.apache.qpid.disttest.client.Client; +import org.apache.qpid.disttest.controller.Controller; + +/** + * A command sent between the {@link Controller} and a {@link Client} + */ +public abstract class Command +{ + private final CommandType type; + + public Command(final CommandType type) + { + this.type = type; + } + + public CommandType getType() + { + return type; + } + + public void accept(Visitor visitor) + { + visitor.visit(this); + } + + @Override + public String toString() + { + return ReflectionToStringBuilder.toString(this, ToStringStyle.SHORT_PREFIX_STYLE); + } +} diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/message/CommandType.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/message/CommandType.java new file mode 100644 index 0000000000..b04cbdaba1 --- /dev/null +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/message/CommandType.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.disttest.message; + +public enum CommandType +{ + CREATE_CONNECTION, + CREATE_CONSUMER, + CREATE_PRODUCER, + CREATE_RESPONDER, + CREATE_SESSION, + NO_OP, + REGISTER_CLIENT, + RESPONSE, + START_TEST, + STOP_CLIENT, + TEAR_DOWN_TEST, + PARTICIPANT_RESULT, + CONSUMER_PARTICIPANT_RESULT, + PRODUCER_PARTICIPANT_RESULT, + CREATE_MESSAGE_PROVIDER +} diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/message/ConsumerParticipantResult.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/message/ConsumerParticipantResult.java new file mode 100644 index 0000000000..f92e3ea538 --- /dev/null +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/message/ConsumerParticipantResult.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.disttest.message; + +import static org.apache.qpid.disttest.message.ParticipantAttribute.IS_BROWSIING_SUBSCRIPTION; +import static org.apache.qpid.disttest.message.ParticipantAttribute.IS_DURABLE_SUBSCRIPTION; +import static org.apache.qpid.disttest.message.ParticipantAttribute.IS_NO_LOCAL; +import static org.apache.qpid.disttest.message.ParticipantAttribute.IS_SELECTOR; +import static org.apache.qpid.disttest.message.ParticipantAttribute.IS_SYNCHRONOUS_CONSUMER; +import static org.apache.qpid.disttest.message.ParticipantAttribute.IS_TOPIC; + +public class ConsumerParticipantResult extends ParticipantResult +{ + private boolean _topic; + private boolean _durableSubscription; + private boolean _browsingSubscription; + private boolean _selector; + private boolean _noLocal; + private boolean _synchronousConsumer; + + public ConsumerParticipantResult() + { + super(CommandType.CONSUMER_PARTICIPANT_RESULT); + } + + public ConsumerParticipantResult(String participantName) + { + this(); + setParticipantName(participantName); + } + + @OutputAttribute(attribute=IS_DURABLE_SUBSCRIPTION) + public boolean isDurableSubscription() + { + return _durableSubscription; + } + + public void setDurableSubscription(boolean durable) + { + _durableSubscription = durable; + } + + + @OutputAttribute(attribute=IS_BROWSIING_SUBSCRIPTION) + public boolean isBrowsingSubscription() + { + return _browsingSubscription; + } + + public void setBrowsingSubscription(boolean browsingSubscription) + { + _browsingSubscription = browsingSubscription; + } + + + @OutputAttribute(attribute=IS_SELECTOR) + public boolean isSelector() + { + return _selector; + } + + public void setSelector(boolean selector) + { + _selector = selector; + } + + + @OutputAttribute(attribute=IS_NO_LOCAL) + public boolean isNoLocal() + { + return _noLocal; + + } + + public void setNoLocal(boolean noLocal) + { + _noLocal = noLocal; + } + + @OutputAttribute(attribute=IS_SYNCHRONOUS_CONSUMER) + public boolean isSynchronousConsumer() + { + return _synchronousConsumer; + } + + public void setSynchronousConsumer(boolean synchronousConsumer) + { + _synchronousConsumer = synchronousConsumer; + } + + + public void setTopic(boolean isTopic) + { + _topic = isTopic; + } + + @OutputAttribute(attribute=IS_TOPIC) + public boolean isTopic() + { + return _topic; + } +} diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/message/CreateConnectionCommand.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/message/CreateConnectionCommand.java new file mode 100644 index 0000000000..c5a96e9a94 --- /dev/null +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/message/CreateConnectionCommand.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.disttest.message; + +public class CreateConnectionCommand extends Command +{ + private String _connectionName; + private String _connectionFactoryName; + + public CreateConnectionCommand() + { + super(CommandType.CREATE_CONNECTION); + } + + public CreateConnectionCommand(String connectionName, String connectionFactoryName) + { + super(CommandType.CREATE_CONNECTION); + _connectionName = connectionName; + _connectionFactoryName = connectionFactoryName; + } + + public void setConnectionName(final String connectionName) + { + this._connectionName = connectionName; + } + + public String getConnectionName() + { + return _connectionName; + } + + public void setConnectionFactoryName(final String connectionFactoryName) + { + this._connectionFactoryName = connectionFactoryName; + } + + public String getConnectionFactoryName() + { + return _connectionFactoryName; + } +} diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/message/CreateConsumerCommand.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/message/CreateConsumerCommand.java new file mode 100644 index 0000000000..678e428f94 --- /dev/null +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/message/CreateConsumerCommand.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.disttest.message; + +public class CreateConsumerCommand extends CreateParticpantCommand +{ + private boolean _isTopic; + private boolean _isDurableSubscription; + private boolean _isBrowsingSubscription; + private String _selector; + private boolean _noLocal; + private boolean _synchronous; + private long _receiveTimeout = 5000; + + + public CreateConsumerCommand() + { + super(CommandType.CREATE_CONSUMER); + } + + public boolean isDurableSubscription() + { + return _isDurableSubscription; + } + + public void setDurableSubscription(final boolean isDurableSubscription) + { + this._isDurableSubscription = isDurableSubscription; + } + + public boolean isBrowsingSubscription() + { + return _isBrowsingSubscription; + } + + public void setBrowsingSubscription(final boolean isBrowsingSubscription) + { + _isBrowsingSubscription = isBrowsingSubscription; + } + + public String getSelector() + { + return _selector; + } + + public void setSelector(final String selector) + { + this._selector = selector; + } + + public boolean isNoLocal() + { + return _noLocal; + } + + public void setNoLocal(final boolean noLocal) + { + this._noLocal = noLocal; + } + + public boolean isTopic() + { + return _isTopic; + } + + public void setTopic(boolean isTopic) + { + this._isTopic = isTopic; + } + + public boolean isSynchronous() + { + return _synchronous; + } + + public void setSynchronous(boolean synchronous) + { + _synchronous = synchronous; + } + + public void setReceiveTimeout(long receiveTimeout) + { + _receiveTimeout = receiveTimeout; + + } + + public long getReceiveTimeout() + { + return _receiveTimeout; + } +} diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/message/CreateMessageProviderCommand.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/message/CreateMessageProviderCommand.java new file mode 100644 index 0000000000..3f30fdd96a --- /dev/null +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/message/CreateMessageProviderCommand.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.disttest.message; + +import java.util.Map; + +import org.apache.qpid.disttest.client.property.PropertyValue; + +public class CreateMessageProviderCommand extends Command +{ + private String _providerName; + private Map<String, PropertyValue> _messageProperties; + + public CreateMessageProviderCommand() + { + super(CommandType.CREATE_MESSAGE_PROVIDER); + } + + public String getProviderName() + { + return _providerName; + } + + public void setProviderName(String providerName) + { + this._providerName = providerName; + } + + public Map<String, PropertyValue> getMessageProperties() + { + return _messageProperties; + } + + public void setMessageProperties(Map<String, PropertyValue> messageProperties) + { + this._messageProperties = messageProperties; + } +} diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/message/CreateParticpantCommand.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/message/CreateParticpantCommand.java new file mode 100644 index 0000000000..f5d4a3ae9b --- /dev/null +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/message/CreateParticpantCommand.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.disttest.message; + +public abstract class CreateParticpantCommand extends Command +{ + private String _participantName; + private String _sessionName; + private String _destinationName; + private long _numberOfMessages; + private int _batchSize; + private long _maximumDuration; + + public CreateParticpantCommand(CommandType type) + { + super(type); + } + + public String getParticipantName() + { + return _participantName; + } + + public void setParticipantName(final String participantName) + { + _participantName = participantName; + } + + public String getSessionName() + { + return _sessionName; + } + + public void setSessionName(final String sessionName) + { + _sessionName = sessionName; + } + + public String getDestinationName() + { + return _destinationName; + } + + public void setDestinationName(final String destinationName) + { + _destinationName = destinationName; + } + + public long getNumberOfMessages() + { + return _numberOfMessages; + } + + public void setNumberOfMessages(final long numberOfMessages) + { + _numberOfMessages = numberOfMessages; + } + + public int getBatchSize() + { + return _batchSize; + } + + public void setBatchSize(int batchSize) + { + _batchSize = batchSize; + } + + public long getMaximumDuration() + { + return _maximumDuration; + } + + public void setMaximumDuration(long maximumDuration) + { + _maximumDuration = maximumDuration; + } + +} diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/message/CreateProducerCommand.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/message/CreateProducerCommand.java new file mode 100644 index 0000000000..69dfe1ff5a --- /dev/null +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/message/CreateProducerCommand.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.disttest.message; + +public class CreateProducerCommand extends CreateParticpantCommand +{ + private int _deliveryMode; + private int _messageSize; + private int _priority; + private long _timeToLive; + private long _interval; + private long _startDelay; + private String _messageProviderName; + + public CreateProducerCommand() + { + super(CommandType.CREATE_PRODUCER); + } + + public int getMessageSize() + { + return _messageSize; + } + + public void setMessageSize(final int messageSize) + { + this._messageSize = messageSize; + } + + public int getPriority() + { + return _priority; + } + + public void setPriority(final int priority) + { + this._priority = priority; + } + + public int getDeliveryMode() + { + return _deliveryMode; + } + + public void setDeliveryMode(final int deliveryMode) + { + this._deliveryMode = deliveryMode; + } + + public long getTimeToLive() + { + return _timeToLive; + } + + public void setTimeToLive(final long timeToLive) + { + this._timeToLive = timeToLive; + } + + public long getInterval() + { + return _interval; + } + + public void setInterval(long interval) + { + this._interval = interval; + } + + public long getStartDelay() + { + return _startDelay; + } + + public void setStartDelay(long startDelay) + { + this._startDelay = startDelay; + } + + public String getMessageProviderName() + { + return _messageProviderName; + } + + public void setMessageProviderName(String messageProviderName) + { + this._messageProviderName = messageProviderName; + } +} diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/message/CreateResponderCommand.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/message/CreateResponderCommand.java new file mode 100644 index 0000000000..85a2b5e548 --- /dev/null +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/message/CreateResponderCommand.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.disttest.message; + +public class CreateResponderCommand extends Command +{ + public CreateResponderCommand() + { + super(CommandType.CREATE_RESPONDER); + } +} diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/message/CreateSessionCommand.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/message/CreateSessionCommand.java new file mode 100644 index 0000000000..f6f59c26af --- /dev/null +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/message/CreateSessionCommand.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.disttest.message; + +public class CreateSessionCommand extends Command +{ + private String sessionName; + private String connectionName; + private int acknowledgeMode; + + public CreateSessionCommand() + { + super(CommandType.CREATE_SESSION); + } + + public String getSessionName() + { + return sessionName; + } + + public void setSessionName(final String sessionName) + { + this.sessionName = sessionName; + } + + public String getConnectionName() + { + return connectionName; + } + + public void setConnectionName(final String connectionName) + { + this.connectionName = connectionName; + } + + public int getAcknowledgeMode() + { + return acknowledgeMode; + } + + public void setAcknowledgeMode(final int acknowledgeMode) + { + this.acknowledgeMode = acknowledgeMode; + } +} diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/message/NoOpCommand.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/message/NoOpCommand.java new file mode 100644 index 0000000000..1cdaf00163 --- /dev/null +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/message/NoOpCommand.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.disttest.message; + + +public class NoOpCommand extends Command +{ + public NoOpCommand() + { + super(CommandType.NO_OP); + } + +} diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/message/OutputAttribute.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/message/OutputAttribute.java new file mode 100644 index 0000000000..b912eaa1cb --- /dev/null +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/message/OutputAttribute.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.disttest.message; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + + +/** + * Marks an attribute of {@link ParticipantResult} that should be written to the test output file. + */ +@Target(ElementType.METHOD) +@Retention(RetentionPolicy.RUNTIME) +public @interface OutputAttribute +{ + ParticipantAttribute attribute(); +} diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/message/ParticipantAttribute.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/message/ParticipantAttribute.java new file mode 100644 index 0000000000..c2fb38cc96 --- /dev/null +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/message/ParticipantAttribute.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.disttest.message; + +import org.apache.qpid.disttest.client.Participant; + +/** + * Meta-date representing the attributes of {@link Participant} that we write to the test output file. + * + * Order of declaration is currently important - it determines they order they appear in the output. + * + * @see OutputAttribute + */ +public enum ParticipantAttribute +{ + TEST_NAME("Test Name"), + ITERATION_NUMBER("Iteration number"), + CONFIGURED_CLIENT_NAME("Client Name"), + PARTICIPANT_NAME("Participant name"), + NUMBER_OF_MESSAGES_PROCESSED("Number of messages"), + PAYLOAD_SIZE("Payload size (bytes)"), + PRIORITY("Priority"), + TIME_TO_LIVE("Time to live (ms)"), + DELIVERY_MODE("Delivery mode"), + BATCH_SIZE("Batch size"), + MAXIMUM_DURATION("Maximum duration (ms)"), + PRODUCER_START_DELAY("Producer start delay (ms)"), + PRODUCER_INTERVAL("Producer interval (ms)"), + IS_TOPIC("Is topic"), + IS_DURABLE_SUBSCRIPTION("Is durable subscription"), + IS_BROWSIING_SUBSCRIPTION("Is browsing subscription"), + IS_SELECTOR("Is selector"), + IS_NO_LOCAL("Is no local"), + IS_SYNCHRONOUS_CONSUMER("Is synchronous consumer"), + TOTAL_PAYLOAD_PROCESSED("Total payload processed (bytes)"), + THROUGHPUT("Throughput (kbytes/s)"), + TIME_TAKEN("Time taken (ms)"), + ERROR_MESSAGE("Error message"); + + private String _displayName; + + ParticipantAttribute(String displayName) + { + _displayName = displayName; + } + + public String getDisplayName() + { + return _displayName; + } +} diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/message/ParticipantAttributeExtractor.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/message/ParticipantAttributeExtractor.java new file mode 100644 index 0000000000..95a19ceefc --- /dev/null +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/message/ParticipantAttributeExtractor.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.disttest.message; + +import java.beans.PropertyDescriptor; +import java.lang.annotation.Annotation; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.HashMap; +import java.util.Map; + +import org.apache.commons.beanutils.PropertyUtils; + + +public class ParticipantAttributeExtractor +{ + public static Map<ParticipantAttribute, Object> getAttributes(Object targetObject) + { + Map<ParticipantAttribute, Object> attributes = new HashMap<ParticipantAttribute, Object>(); + + + PropertyDescriptor[] descriptors = PropertyUtils.getPropertyDescriptors(targetObject); + for (PropertyDescriptor propertyDescriptor : descriptors) + { + final Method readMethod = getPropertyReadMethod(targetObject, propertyDescriptor); + + for (Annotation annotation : readMethod.getDeclaredAnnotations()) + { + if (annotation instanceof OutputAttribute) + { + OutputAttribute outputAttribute = (OutputAttribute) annotation; + + Object value = getPropertyValue(targetObject, propertyDescriptor.getName()); + attributes.put(outputAttribute.attribute(), value); + } + } + } + + return attributes; + } + + public static Method getPropertyReadMethod(Object targetObject, PropertyDescriptor propertyDescriptor) + { + final Method readMethod = propertyDescriptor.getReadMethod(); + + if (readMethod == null) + { + throw new RuntimeException("No read method for property " + propertyDescriptor.getName() + " on " + targetObject); + } + return readMethod; + } + + public static Object getPropertyValue(Object targetObject, String propertyName) + { + try + { + return PropertyUtils.getProperty(targetObject, propertyName); + } + catch (IllegalAccessException e) + { + throw new RuntimeException("Couldn't get value of property " + propertyName + " from " + targetObject, e); + } + catch (InvocationTargetException e) + { + throw new RuntimeException("Couldn't get value of property " + propertyName + " from " + targetObject, e); + } + catch (NoSuchMethodException e) + { + throw new RuntimeException("Couldn't get value of property " + propertyName + " from " + targetObject, e); + } + + } +} diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/message/ParticipantResult.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/message/ParticipantResult.java new file mode 100644 index 0000000000..4550f10c65 --- /dev/null +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/message/ParticipantResult.java @@ -0,0 +1,231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.disttest.message; + +import static org.apache.qpid.disttest.message.ParticipantAttribute.BATCH_SIZE; +import static org.apache.qpid.disttest.message.ParticipantAttribute.CONFIGURED_CLIENT_NAME; +import static org.apache.qpid.disttest.message.ParticipantAttribute.ITERATION_NUMBER; +import static org.apache.qpid.disttest.message.ParticipantAttribute.MAXIMUM_DURATION; +import static org.apache.qpid.disttest.message.ParticipantAttribute.PAYLOAD_SIZE; +import static org.apache.qpid.disttest.message.ParticipantAttribute.NUMBER_OF_MESSAGES_PROCESSED; +import static org.apache.qpid.disttest.message.ParticipantAttribute.THROUGHPUT; +import static org.apache.qpid.disttest.message.ParticipantAttribute.PARTICIPANT_NAME; +import static org.apache.qpid.disttest.message.ParticipantAttribute.TEST_NAME; + +import java.util.Comparator; +import java.util.Date; +import java.util.Map; + +public class ParticipantResult extends Response +{ + private String _testName; + private String _participantName; + private long _startInMillis; + private long _endInMillis; + private int _batchSize; + private long _maximumDuration; + private int _iterationNumber; + + private String _configuredClientName; + + private long _numberOfMessagesProcessed; + private long _totalPayloadProcessed; + private int _payloadSize; + private double _throughput; + + public static final Comparator<? super ParticipantResult> PARTICIPANT_NAME_COMPARATOR = new Comparator<ParticipantResult>() + { + @Override + public int compare(ParticipantResult participantResult1, ParticipantResult participantResult2) + { + return participantResult1.getParticipantName().compareTo(participantResult2.getParticipantName()); + } + }; + + public ParticipantResult() + { + this(CommandType.PARTICIPANT_RESULT); + } + + public ParticipantResult(CommandType commandType) + { + super(commandType); + } + + public ParticipantResult(String participantName) + { + this(); + setParticipantName(participantName); + } + + @OutputAttribute(attribute=TEST_NAME) + public String getTestName() + { + return _testName; + } + + public void setTestName(String testName) + { + _testName = testName; + } + + @OutputAttribute(attribute=ITERATION_NUMBER) + public int getIterationNumber() + { + return _iterationNumber; + } + + public void setIterationNumber(int iterationNumber) + { + _iterationNumber = iterationNumber; + } + + public void setStartDate(Date start) + { + _startInMillis = start.getTime(); + } + + public void setEndDate(Date end) + { + _endInMillis = end.getTime(); + } + + public Date getStartDate() + { + return new Date(_startInMillis); + } + + public Date getEndDate() + { + return new Date(_endInMillis); + } + + + public long getStartInMillis() + { + return _startInMillis; + } + + public long getEndInMillis() + { + return _endInMillis; + } + + + @OutputAttribute(attribute=PARTICIPANT_NAME) + public String getParticipantName() + { + return _participantName; + } + + + public void setParticipantName(String participantName) + { + _participantName = participantName; + } + + @OutputAttribute(attribute=ParticipantAttribute.TIME_TAKEN) + public long getTimeTaken() + { + return _endInMillis - _startInMillis; + } + + @OutputAttribute(attribute=CONFIGURED_CLIENT_NAME) + public String getConfiguredClientName() + { + return _configuredClientName; + } + + public void setConfiguredClientName(String configuredClientName) + { + _configuredClientName = configuredClientName; + } + + @OutputAttribute(attribute=NUMBER_OF_MESSAGES_PROCESSED) + public long getNumberOfMessagesProcessed() + { + return _numberOfMessagesProcessed; + } + + public void setNumberOfMessagesProcessed(long numberOfMessagesProcessed) + { + _numberOfMessagesProcessed = numberOfMessagesProcessed; + } + + @OutputAttribute(attribute=ParticipantAttribute.TOTAL_PAYLOAD_PROCESSED) + public long getTotalPayloadProcessed() + { + return _totalPayloadProcessed; + } + + @OutputAttribute(attribute = PAYLOAD_SIZE) + public int getPayloadSize() + { + return _payloadSize; + } + + public void setPayloadSize(int payloadSize) + { + _payloadSize = payloadSize; + } + + public void setTotalPayloadProcessed(long totalPayloadProcessed) + { + _totalPayloadProcessed = totalPayloadProcessed; + } + + public Map<ParticipantAttribute, Object> getAttributes() + { + return ParticipantAttributeExtractor.getAttributes(this); + } + + public void setBatchSize(int batchSize) + { + _batchSize = batchSize; + } + + @OutputAttribute(attribute=BATCH_SIZE) + public int getBatchSize() + { + return _batchSize; + } + + public void setMaximumDuration(long maximumDuration) + { + _maximumDuration = maximumDuration; + } + + @OutputAttribute(attribute=MAXIMUM_DURATION) + public long getMaximumDuration() + { + return _maximumDuration; + } + + @OutputAttribute(attribute=THROUGHPUT) + public double getThroughput() + { + return _throughput; + } + + public void setThroughput(double throughput) + { + _throughput = throughput; + } + +} diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/message/ProducerParticipantResult.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/message/ProducerParticipantResult.java new file mode 100644 index 0000000000..766c90eec8 --- /dev/null +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/message/ProducerParticipantResult.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.disttest.message; + +import static org.apache.qpid.disttest.message.ParticipantAttribute.DELIVERY_MODE; +import static org.apache.qpid.disttest.message.ParticipantAttribute.PRIORITY; +import static org.apache.qpid.disttest.message.ParticipantAttribute.PRODUCER_INTERVAL; +import static org.apache.qpid.disttest.message.ParticipantAttribute.PRODUCER_START_DELAY; +import static org.apache.qpid.disttest.message.ParticipantAttribute.TIME_TO_LIVE; + +public class ProducerParticipantResult extends ParticipantResult +{ + private int _priority; + private long _timeToLive; + private long _startDelay; + private long _interval; + private int _deliveryMode; + public ProducerParticipantResult() + { + super(CommandType.PRODUCER_PARTICIPANT_RESULT); + } + + public ProducerParticipantResult(String participantName) + { + this(); + setParticipantName(participantName); + } + + @OutputAttribute(attribute=PRIORITY) + public int getPriority() + { + return _priority; + } + + public void setPriority(int priority) + { + _priority = priority; + } + + @OutputAttribute(attribute=TIME_TO_LIVE) + public long getTimeToLive() + { + return _timeToLive; + } + + public void setTimeToLive(long timeToLive) + { + _timeToLive = timeToLive; + } + + @OutputAttribute(attribute=PRODUCER_START_DELAY) + public long getStartDelay() + { + return _startDelay; + } + + public void setStartDelay(long startDelay) + { + _startDelay = startDelay; + } + + @OutputAttribute(attribute=PRODUCER_INTERVAL) + public long getInterval() + { + return _interval; + } + + public void setInterval(long producerInterval) + { + _interval = producerInterval; + } + + @OutputAttribute(attribute=DELIVERY_MODE) + public int getDeliveryMode() + { + return _deliveryMode; + } + + public void setDeliveryMode(int deliveryMode) + { + this._deliveryMode = deliveryMode; + } + +} diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/message/RegisterClientCommand.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/message/RegisterClientCommand.java new file mode 100644 index 0000000000..af880a37d9 --- /dev/null +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/message/RegisterClientCommand.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.disttest.message; + +public class RegisterClientCommand extends Command +{ + private final String _clientName; + private final String _clientQueueName; + + public RegisterClientCommand(final String clientName, final String clientQueueName) + { + super(CommandType.REGISTER_CLIENT); + _clientName = clientName; + _clientQueueName = clientQueueName; + } + + public String getClientName() + { + return _clientName; + } + + public String getClientQueueName() + { + return _clientQueueName; + } +} diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/message/Response.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/message/Response.java new file mode 100644 index 0000000000..aac056efcb --- /dev/null +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/message/Response.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.disttest.message; + + +public class Response extends Command +{ + protected String _registeredClientName; + protected String _errorMessage; + private CommandType _inReplyToCommandType; + + public Response(final String registeredclientName, final CommandType inReplyToCommandType, final String errorMessage) + { + super(CommandType.RESPONSE); + _registeredClientName = registeredclientName; + _errorMessage = errorMessage; + _inReplyToCommandType = inReplyToCommandType; + } + + public Response(String clientName, CommandType inReplyToCommandType) + { + this(clientName, inReplyToCommandType, null); + } + + /** + * Provided so that subclasses can call super(commandType) + */ + protected Response(CommandType commandType) + { + super(commandType); + } + + public String getRegisteredClientName() + { + return _registeredClientName; + } + + public void setRegisteredClientName(String registeredClientName) + { + _registeredClientName = registeredClientName; + } + + @OutputAttribute(attribute=ParticipantAttribute.ERROR_MESSAGE) + public String getErrorMessage() + { + return _errorMessage; + } + + public void setErrorMessage(String errorMessage) + { + _errorMessage = errorMessage; + } + + public boolean hasError() + { + return _errorMessage != null; + } + + public CommandType getInReplyToCommandType() + { + return _inReplyToCommandType; + } +} diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/message/StartTestCommand.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/message/StartTestCommand.java new file mode 100644 index 0000000000..4a53697ecd --- /dev/null +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/message/StartTestCommand.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.disttest.message; + +public class StartTestCommand extends Command +{ + + public StartTestCommand() + { + super(CommandType.START_TEST); + } + +}
\ No newline at end of file diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/message/StopClientCommand.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/message/StopClientCommand.java new file mode 100644 index 0000000000..08758aaa69 --- /dev/null +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/message/StopClientCommand.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.disttest.message; + +public class StopClientCommand extends Command +{ + public StopClientCommand() + { + super(CommandType.STOP_CLIENT); + } +} diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/message/TearDownTestCommand.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/message/TearDownTestCommand.java new file mode 100644 index 0000000000..6b1367d4f9 --- /dev/null +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/message/TearDownTestCommand.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.disttest.message; + +public class TearDownTestCommand extends Command +{ + + public TearDownTestCommand() + { + super(CommandType.TEAR_DOWN_TEST); + } + +} diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/results/aggregation/AggregatedTestResult.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/results/aggregation/AggregatedTestResult.java new file mode 100644 index 0000000000..5e6da2e65b --- /dev/null +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/results/aggregation/AggregatedTestResult.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.disttest.results.aggregation; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.qpid.disttest.message.ParticipantResult; + +public class AggregatedTestResult implements ITestResult +{ + private ParticipantResult _allParticipantResult; + private ParticipantResult _allConsumerParticipantResult; + private ParticipantResult _allProducerParticipantResult; + private final ITestResult _originalTestResult; + + public AggregatedTestResult(ITestResult originalTestResult) + { + _originalTestResult = originalTestResult; + } + + /** + * Returns the result where {@link ParticipantResult#getNumberOfMessagesProcessed()} + * is the total number of messages consumed during the test, and {@link ParticipantResult#getTimeTaken()} + * is the time between the start of the first producer and the end of the last consumer to finish. + */ + public ParticipantResult getAllParticipantResult() + { + return _allParticipantResult; + } + + public void setAllParticipantResult(ParticipantResult allParticipantResult) + { + _allParticipantResult = allParticipantResult; + } + + public ParticipantResult getAllConsumerParticipantResult() + { + return _allConsumerParticipantResult; + } + public void setAllConsumerParticipantResult(ParticipantResult allConsumerParticipantResult) + { + _allConsumerParticipantResult = allConsumerParticipantResult; + } + public ParticipantResult getAllProducerParticipantResult() + { + return _allProducerParticipantResult; + } + public void setAllProducerParticipantResult(ParticipantResult allProducerParticipantResult) + { + _allProducerParticipantResult = allProducerParticipantResult; + } + + // TODO should weaken to Collection + @Override + public List<ParticipantResult> getParticipantResults() + { + List<ParticipantResult> allParticipantResults = new ArrayList<ParticipantResult>(_originalTestResult.getParticipantResults()); + + allParticipantResults.add(_allConsumerParticipantResult); + allParticipantResults.add(_allProducerParticipantResult); + allParticipantResults.add(_allParticipantResult); + + return allParticipantResults; + } + + @Override + public boolean hasErrors() + { + return _originalTestResult.hasErrors(); + } + + @Override + public String getName() + { + return _originalTestResult.getName(); + } + + + +} diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/results/aggregation/Aggregator.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/results/aggregation/Aggregator.java new file mode 100644 index 0000000000..cde30d36e5 --- /dev/null +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/results/aggregation/Aggregator.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.disttest.results.aggregation; + +import org.apache.qpid.disttest.controller.ResultsForAllTests; + +public class Aggregator +{ + + private TestResultAggregator _testResultAggregator = new TestResultAggregator(); + + public ResultsForAllTests aggregateResults(ResultsForAllTests rawResultsForAllTests) + { + + ResultsForAllTests aggregatedResultsForAllTests = new ResultsForAllTests(); + + + for (ITestResult testResult : rawResultsForAllTests.getTestResults()) + { + AggregatedTestResult aggregateTestResult = _testResultAggregator.aggregateTestResult(testResult); + aggregatedResultsForAllTests.add(aggregateTestResult); + } + + + return aggregatedResultsForAllTests; + } + + void setTestResultAggregator(TestResultAggregator testResultAggregator) + { + _testResultAggregator = testResultAggregator; + } + +} diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/results/aggregation/ITestResult.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/results/aggregation/ITestResult.java new file mode 100644 index 0000000000..3f9cdff69d --- /dev/null +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/results/aggregation/ITestResult.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.disttest.results.aggregation; + +import java.util.List; + +import org.apache.qpid.disttest.message.ParticipantResult; + +// TODO rename me!! +public interface ITestResult +{ + + // TODO should weaken to Collection + List<ParticipantResult> getParticipantResults(); + + boolean hasErrors(); + + String getName(); + +}
\ No newline at end of file diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/results/aggregation/ParticipantResultAggregator.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/results/aggregation/ParticipantResultAggregator.java new file mode 100644 index 0000000000..6c4e4f87ac --- /dev/null +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/results/aggregation/ParticipantResultAggregator.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.disttest.results.aggregation; + +import java.util.Date; +import java.util.NavigableSet; +import java.util.TreeSet; + +import org.apache.qpid.disttest.message.ParticipantResult; + +public class ParticipantResultAggregator +{ + private final String _aggregatedResultName; + private final Class<? extends ParticipantResult> _targetClass; + + private long _minStartDate = Long.MAX_VALUE; + private long _maxEndDate = 0; + private long _numberOfMessagesProcessed = 0; + private long _totalPayloadProcessed = 0; + + private NavigableSet<Integer> _encounteredPayloadSizes = new TreeSet<Integer>(); + private NavigableSet<Integer> _encounteredIterationNumbers = new TreeSet<Integer>(); + private NavigableSet<String> _encountedTestNames = new TreeSet<String>(); + + public ParticipantResultAggregator(Class<? extends ParticipantResult> taregtClass, String aggregateResultName) + { + _aggregatedResultName = aggregateResultName; + _targetClass = taregtClass; + } + + public void aggregate(ParticipantResult result) + { + if (isAggregatable(result)) + { + rollupConstantAttributes(result); + computeVariableAttributes(result); + } + } + + public ParticipantResult getAggregatedResult() + { + ParticipantResult aggregatedResult = new ParticipantResult(_aggregatedResultName); + + setRolledUpConstantAttributes(aggregatedResult); + setComputedVariableAttributes(aggregatedResult); + + return aggregatedResult; + } + + private boolean isAggregatable(ParticipantResult result) + { + return _targetClass.isAssignableFrom(result.getClass()); + } + + private void computeVariableAttributes(ParticipantResult result) + { + _numberOfMessagesProcessed += result.getNumberOfMessagesProcessed(); + _totalPayloadProcessed += result.getTotalPayloadProcessed(); + _minStartDate = Math.min(_minStartDate, result.getStartInMillis()); + _maxEndDate = Math.max(_maxEndDate, result.getEndInMillis()); + } + + private void rollupConstantAttributes(ParticipantResult result) + { + if (result.getTestName() != null) + { + _encountedTestNames.add(result.getTestName()); + } + _encounteredPayloadSizes.add(result.getPayloadSize()); + _encounteredIterationNumbers.add(result.getIterationNumber()); + } + + private void setComputedVariableAttributes(ParticipantResult aggregatedResult) + { + aggregatedResult.setNumberOfMessagesProcessed(_numberOfMessagesProcessed); + aggregatedResult.setTotalPayloadProcessed(_totalPayloadProcessed); + aggregatedResult.setStartDate(new Date(_minStartDate)); + aggregatedResult.setEndDate(new Date(_maxEndDate)); + aggregatedResult.setThroughput(calculateThroughputInKiloBytesPerSecond()); + } + + private void setRolledUpConstantAttributes(ParticipantResult aggregatedResult) + { + if (_encounteredIterationNumbers.size() == 1) + { + aggregatedResult.setIterationNumber( _encounteredIterationNumbers.first()); + } + if (_encounteredPayloadSizes.size() == 1) + { + aggregatedResult.setPayloadSize(_encounteredPayloadSizes.first()); + } + if (_encountedTestNames.size() == 1) + { + aggregatedResult.setTestName(_encountedTestNames.first()); + } + } + + private double calculateThroughputInKiloBytesPerSecond() + { + double durationInMillis = _maxEndDate - _minStartDate; + double durationInSeconds = durationInMillis / 1000; + double totalPayloadProcessedInKiloBytes = ((double)_totalPayloadProcessed) / 1024; + + return totalPayloadProcessedInKiloBytes/durationInSeconds; + } + +} diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/results/aggregation/TestResultAggregator.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/results/aggregation/TestResultAggregator.java new file mode 100644 index 0000000000..1938add560 --- /dev/null +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/results/aggregation/TestResultAggregator.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.disttest.results.aggregation; + +import org.apache.qpid.disttest.message.ConsumerParticipantResult; +import org.apache.qpid.disttest.message.ParticipantResult; +import org.apache.qpid.disttest.message.ProducerParticipantResult; + +public class TestResultAggregator +{ + static final String AGGREGATED_ERROR_MESSAGE = "One or more participants reported errors."; + public static final String ALL_PARTICIPANTS_NAME = "All"; + public static final String ALL_PRODUCER_PARTICIPANTS_NAME = "All Producers"; + public static final String ALL_CONSUMER_PARTICIPANTS_NAME = "All Consumers"; + + public AggregatedTestResult aggregateTestResult(ITestResult originalTestResult) + { + AggregatedTestResult newTestResult = new AggregatedTestResult(originalTestResult); + + ParticipantResultAggregator consumerParticipantResultAggregator = new ParticipantResultAggregator(ConsumerParticipantResult.class, + ALL_CONSUMER_PARTICIPANTS_NAME); + ParticipantResultAggregator producerParticipantResultAggregator = new ParticipantResultAggregator(ProducerParticipantResult.class, + ALL_PRODUCER_PARTICIPANTS_NAME); + + boolean hasError = aggregate(originalTestResult, + consumerParticipantResultAggregator, + producerParticipantResultAggregator); + + ParticipantResult aggregatedProducerResult = producerParticipantResultAggregator.getAggregatedResult(); + newTestResult.setAllProducerParticipantResult(aggregatedProducerResult); + + ParticipantResult aggregaredConsumerResult = consumerParticipantResultAggregator.getAggregatedResult(); + newTestResult.setAllConsumerParticipantResult(aggregaredConsumerResult); + + ParticipantResult aggregatedAllResult = buildAllResultFromOtherAggregatedResults( + aggregatedProducerResult, aggregaredConsumerResult); + + if (hasError) + { + aggregatedAllResult.setErrorMessage(TestResultAggregator.AGGREGATED_ERROR_MESSAGE); + } + newTestResult.setAllParticipantResult(aggregatedAllResult); + + return newTestResult; + } + + private boolean aggregate(ITestResult originalTestResult, + ParticipantResultAggregator consumerParticipantResultAggregator, + ParticipantResultAggregator producerParticipantResultAggregator) + { + boolean hasError = false; + for (ParticipantResult result : originalTestResult.getParticipantResults()) + { + consumerParticipantResultAggregator.aggregate(result); + producerParticipantResultAggregator.aggregate(result); + + if (result.hasError()) + { + hasError = true; + } + } + return hasError; + } + + private ParticipantResult buildAllResultFromOtherAggregatedResults( + ParticipantResult aggregatedProducerResult, ParticipantResult aggregaredConsumerResult) + { + ParticipantResult aggregatedAllResult = new ParticipantResult(ALL_PARTICIPANTS_NAME); + aggregatedAllResult.setStartDate(aggregatedProducerResult.getStartDate()); + + aggregatedAllResult.setEndDate(aggregaredConsumerResult.getEndDate()); + + aggregatedAllResult.setIterationNumber(aggregaredConsumerResult.getIterationNumber()); + aggregatedAllResult.setTestName(aggregaredConsumerResult.getTestName()); + aggregatedAllResult.setNumberOfMessagesProcessed(aggregaredConsumerResult.getNumberOfMessagesProcessed()); + aggregatedAllResult.setPayloadSize(aggregaredConsumerResult.getPayloadSize()); + aggregatedAllResult.setTotalPayloadProcessed(aggregaredConsumerResult.getTotalPayloadProcessed()); + aggregatedAllResult.setThroughput(aggregaredConsumerResult.getThroughput()); + + return aggregatedAllResult; + } +} diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/results/formatting/CSVFormater.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/results/formatting/CSVFormater.java new file mode 100644 index 0000000000..52e53ca624 --- /dev/null +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/results/formatting/CSVFormater.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.disttest.results.formatting; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import org.apache.commons.lang.StringUtils; +import org.apache.qpid.disttest.controller.ResultsForAllTests; +import org.apache.qpid.disttest.message.ParticipantAttribute; +import org.apache.qpid.disttest.message.ParticipantResult; +import org.apache.qpid.disttest.results.aggregation.ITestResult; + +/** + * produces CSV output using the ordered enums in {@link ParticipantAttribute} + */ +public class CSVFormater +{ + public String format(ResultsForAllTests results) + { + StringBuilder builder = new StringBuilder(); + + builder.append(header()); + + List<ITestResult> testResults = results.getTestResults(); + + for (ITestResult testResult : testResults) + { + + List<ParticipantResult> participantResults = new ArrayList<ParticipantResult>(testResult.getParticipantResults()); + Collections.sort(participantResults, new CSVOrderParticipantResultComparator()); + + for (ParticipantResult participantResult : participantResults) + { + Map<ParticipantAttribute, Object> attributes = participantResult.getAttributes(); + builder.append(row(attributes)); + } + } + + return builder.toString(); + } + + /** + * return a row, including a newline character at the end + */ + private String row(Map<ParticipantAttribute, Object> attributeValueMap) + { + List<Object> attributeValues = new ArrayList<Object>(); + for (ParticipantAttribute attribute : ParticipantAttribute.values()) + { + attributeValues.add(attributeValueMap.get(attribute)); + } + + String row = StringUtils.join(attributeValues.toArray(), ","); + return row + "\n"; + } + + /** return the header row, including a newline at the end */ + private String header() + { + List<String> displayNames = new ArrayList<String>(); + for (ParticipantAttribute attribute : ParticipantAttribute.values()) + { + displayNames.add(attribute.getDisplayName()); + } + + String header = StringUtils.join(displayNames.toArray(), ","); + return header + "\n"; + } + +} diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/results/formatting/CSVOrderParticipantResultComparator.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/results/formatting/CSVOrderParticipantResultComparator.java new file mode 100644 index 0000000000..0e1fbbc3c6 --- /dev/null +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/results/formatting/CSVOrderParticipantResultComparator.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.disttest.results.formatting; + +import java.util.Comparator; +import java.util.HashMap; +import java.util.Map; + +import org.apache.commons.lang.builder.CompareToBuilder; +import org.apache.qpid.disttest.message.ConsumerParticipantResult; +import org.apache.qpid.disttest.message.ParticipantResult; +import org.apache.qpid.disttest.message.ProducerParticipantResult; + +public class CSVOrderParticipantResultComparator implements Comparator<ParticipantResult> +{ + // TODO yuk + private static final Map<Class<? extends ParticipantResult>, Integer> TYPE_CODES = new HashMap<Class<? extends ParticipantResult>, Integer>(); + static { + TYPE_CODES.put(ProducerParticipantResult.class, 0); + TYPE_CODES.put(ConsumerParticipantResult.class, 1); + TYPE_CODES.put(ParticipantResult.class, 2); + } + + @Override + public int compare(ParticipantResult left, ParticipantResult right) + { + return new CompareToBuilder() + .append(getTypeCode(left), getTypeCode(right)) + .append(left.getParticipantName(), right.getParticipantName()) + .toComparison(); + } + + + private int getTypeCode(ParticipantResult participantResult) + { + return TYPE_CODES.get(participantResult.getClass()); + } + +} diff --git a/qpid/java/perftests/src/main/resources/perftests.properties b/qpid/java/perftests/src/main/resources/perftests.properties new file mode 100644 index 0000000000..d8823f9dc5 --- /dev/null +++ b/qpid/java/perftests/src/main/resources/perftests.properties @@ -0,0 +1,20 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +java.naming.factory.initial = org.apache.qpid.jndi.PropertiesFileInitialContextFactory + +connectionfactory.connectionfactory = amqp://guest:guest@clientid/test?brokerlist='tcp://localhost:5672' |
