diff options
| author | Kim van der Riet <kpvdr@apache.org> | 2007-02-14 20:02:03 +0000 |
|---|---|---|
| committer | Kim van der Riet <kpvdr@apache.org> | 2007-02-14 20:02:03 +0000 |
| commit | a22f3f594d6eee7d610fb4f140e18cddd7c880f6 (patch) | |
| tree | 5adb376ed217d2debaff1c0bdd59af1a1c93e829 /java/perftests | |
| parent | 9cb1922884c5b258c961046e6fd48e5152aa79d5 (diff) | |
| download | qpid-python-a22f3f594d6eee7d610fb4f140e18cddd7c880f6.tar.gz | |
First backmerge from trunk to 0-9 branch for Java. Not all java tests passing yet
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@507672 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/perftests')
24 files changed, 3627 insertions, 665 deletions
diff --git a/java/perftests/RunningPerformanceTests.txt b/java/perftests/RunningPerformanceTests.txt new file mode 100644 index 0000000000..ff3d5ef4f5 --- /dev/null +++ b/java/perftests/RunningPerformanceTests.txt @@ -0,0 +1,112 @@ +Running Performance Tests +------------------------- + +This performance test suite contains a number of tests. + +- Service request-reply +- Ping-Pong +- Topic + +Service request-reply +--------------------- + +Description: +This is the simplest test to ensure everything is working. This involves +one client that is known as a "service provider" and it listens on a +well-known queue for requests. Another client, known as the "service requester" +creates a private (temporary) response queue, creates a message with the +private response queue set as the "reply to" field and then publishes the +message to the well known service queue. The test allows you to time how long +it takes to send messages and receive the response back. It also allows varying +of the message size. + +Quick Run: + +./serviceRequestReply-QuickTest.sh <brokerdetails> <number of messages> + +This provides a quick test to run everything against a running broker. Simply specify broker and number of messages to run. + + +Detailed Run: + +You must start the service provider first: + +serviceProvidingClient.sh <brokerdetails> [<P[ersistent]|N[onPersistent]> <T[ransacted]|N[onTransacted]>] [selector] + +where Brokerdetails is the connection information to the broker you are running on; e.g. localhost or localhost:5670 or tcp://10.10.10.10:5677. +By default Non Persistent, Non Transaction messages are used in the response. A selector may also be specified. + + +To run the service requester: + +serviceRequestingClient.sh <Brokerdetails> <Number of Messages> [<Message Size>] [<P[ersistent]|N[onPersistent]> <T[ransacted]|N[onTransacted]>] + +This requests the <number of messages> of a <Message Size (default 4096 bytes>. By default the connection is Non Persistent and Non Transactional. + +After receiving all the messages the client outputs the rate it achieved. + + +Ping-Pong +--------- + +Description: +Quick Run: +Detailed Run: + +Topic +------- + +Description: +A more realistic test is the topic test, which tests the +performance of the topic exchange to a configurable number of clients (e.g. 50). +A publisher sends batches of of messages to a topic that a number of clients are +subscribed to. The clients recevie each all the messages and then send a response. + +The time taken to send all messages and receive a response from all clients is displayed. + +Quick Run: + +./topic-QuickTest.sh <host> <port> <messages> <clients> <batches> + +This provides a quick test to run everything against a running broker. Simply specify host, port, the number of messages, number of clients and number of batches to run this quick test. + +Detailed Run: + +You must run the listener processes first: + +run_many.sh 10 topic "topicListener.sh [-host <host> -port <port>]" + +In this command, the first argument means start 10 processes, the +second is just a name use in the log files generated and the third +argument is the command to run the specified number of times. + +The topicListener by default connects to localhost:5672 this can be changed using the above flags. + +Then run the publisher process: + +headersPublisher.sh [-host <host> -port <port> -messages <number> -clients <number> -batch <number>] + +The default is to connect to localhost:5672 and send 1 batch of 1000 messages expecting 1 client to respond. + +Note that before starting the publisher you should wait about 30 +seconds to ensure all the clients are registered with the broker (you +can see this from the broker output). Otherwise the numbers will be +slightly skewed. + + +Additional parameters to scripts + +Publisher +-payload <int> : specify the payload size (256b Default) +-delay <long> : Number of seconds to send between batches (0 Default) +-warmup <int> : Number of messages to send as a warm up (0 Default) +-ack <int> : Acknowledgement mode + - 1 : Auto + - 2 : Client + - 3 : Dups_OK + - 257 : No (Default) + - 258 : Pre +-factory <string> : ConnectionFactoryInitialiser class +-persistent <"true"|other> : User persistent messages if string equals "true" (false Default) +-clientId <string> : Set client id +-subscriptionId <string> : set subscription id diff --git a/java/perftests/bin/run_many.sh b/java/perftests/bin/run_many.sh deleted file mode 100755 index cca2ffec21..0000000000 --- a/java/perftests/bin/run_many.sh +++ /dev/null @@ -1,30 +0,0 @@ -#!/bin/sh -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - - -# args: -# <number of processes to start> -# <name of run> -# <command ro run> - -for i in `seq 1 $1`; do - $3 >$2.$i.out 2>>$2.err & - echo $! > $2.$i.pid -done; diff --git a/java/perftests/bin/serviceProvidingClient.sh b/java/perftests/bin/serviceProvidingClient.sh deleted file mode 100755 index 207e4439f1..0000000000 --- a/java/perftests/bin/serviceProvidingClient.sh +++ /dev/null @@ -1,25 +0,0 @@ -#!/bin/bash -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -# XXX -Xms1024m -XX:NewSize=300m -. ./setupclasspath.sh -echo $CP -# usage: just pass in the host(s) -$JAVA_HOME/bin/java -cp $CP org.apache.qpid.requestreply.ServiceProvidingClient $1 guest guest /test serviceQ diff --git a/java/perftests/bin/serviceRequestingClient.sh b/java/perftests/bin/serviceRequestingClient.sh deleted file mode 100755 index 7dd3d63c27..0000000000 --- a/java/perftests/bin/serviceRequestingClient.sh +++ /dev/null @@ -1,27 +0,0 @@ -#!/bin/bash -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# -# args supplied: <host:port> <num messages> -thehosts=$1 -shift -echo $thehosts -# XXX -Xms1024m -XX:NewSize=300m -. ./setupclasspath.sh -echo $CP -$JAVA_HOME/bin/java -cp $CP -Damqj.logging.level="INFO" org.apache.qpid.requestreply.ServiceRequestingClient $thehosts guest guest /test serviceQ "$@" diff --git a/java/perftests/bin/setupclasspath.sh b/java/perftests/bin/setupclasspath.sh deleted file mode 100755 index a660392e77..0000000000 --- a/java/perftests/bin/setupclasspath.sh +++ /dev/null @@ -1,9 +0,0 @@ -if [ -z $QPID_HOME ] ; then - echo "QPID_HOME must be set" - exit -fi -CP=$QPID_HOME/lib/qpid-incubating.jar:../target/classes - -if [ `uname -o` == "Cygwin" ] ; then - CP=`cygpath --path --windows $CP` -fi diff --git a/java/perftests/bin/topicListener.sh b/java/perftests/bin/topicListener.sh deleted file mode 100755 index 454efefe7d..0000000000 --- a/java/perftests/bin/topicListener.sh +++ /dev/null @@ -1,25 +0,0 @@ -#!/bin/bash -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - - -# XXX -Xmx512m -Xms512m -XX:NewSize=150m -. ./setupclasspath.sh -echo $CP -$JAVA_HOME/bin/java -cp $CP -Damqj.logging.level="INFO" org.apache.qpid.topic.Listener $* diff --git a/java/perftests/bin/topicPublisher.sh b/java/perftests/bin/topicPublisher.sh deleted file mode 100755 index cc3a8736cc..0000000000 --- a/java/perftests/bin/topicPublisher.sh +++ /dev/null @@ -1,23 +0,0 @@ -#!/bin/bash -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -# XXX -Xmx512m -Xms512m -XX:NewSize=150m -. ./setupclasspath.sh -$JAVA_HOME/bin/java -cp $CP -Damqj.logging.level="INFO" org.apache.qpid.topic.Publisher $* diff --git a/java/perftests/distribution/pom.xml b/java/perftests/distribution/pom.xml new file mode 100644 index 0000000000..010f19c9f0 --- /dev/null +++ b/java/perftests/distribution/pom.xml @@ -0,0 +1,127 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + --> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + <modelVersion>4.0.0</modelVersion> + <groupId>org.apache.qpid</groupId> + <artifactId>qpid-perftests-distribution</artifactId> + <packaging>jar</packaging> + <version>1.0-incubating-M2-SNAPSHOT</version> + <name>Qpid Performance Tests Distribution</name> + <url>http://cwiki.apache.org/confluence/display/qpid</url> + + <parent> + <groupId>org.apache.qpid</groupId> + <artifactId>qpid</artifactId> + <version>1.0-incubating-M2-SNAPSHOT</version> + </parent> + + <properties> + <topDirectoryLocation>..</topDirectoryLocation> + <java.source.version>1.5</java.source.version> + <qpid.version>${pom.version}</qpid.version> + <qpid.targetDir>${project.build.directory}</qpid.targetDir> + </properties> + + <dependencies> + <dependency> + <groupId>org.apache.qpid</groupId> + <artifactId>qpid-perftests</artifactId> + <type>jar</type> + <version>${pom.version}</version> + </dependency> + <dependency> + <groupId>org.apache.qpid</groupId> + <artifactId>qpid-perftests</artifactId> + <type>test-jar</type> + <version>${pom.version}</version> + </dependency> + <dependency> + <groupId>uk.co.thebadgerset</groupId> + <artifactId>junit-toolkit</artifactId> + <version>0.4</version> + <scope>runtime</scope> + </dependency> + </dependencies> + + <build> + <pluginManagement> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + <configuration> + <source>${java.source.version}</source> + <target>${java.source.version}</target> + </configuration> + </plugin> + + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-assembly-plugin</artifactId> + <version>${assembly.version}</version> + <configuration> + <descriptors> + <descriptor>src/main/assembly/performance.xml</descriptor> + </descriptors> + <finalName>qpid-${pom.version}</finalName> + <outputDirectory>${qpid.targetDir}</outputDirectory> + <tarLongFileMode>gnu</tarLongFileMode> + </configuration> + </plugin> + + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-jar-plugin</artifactId> + <configuration> + <finalName>qpid-performance</finalName> + <archive> + <manifest> + <addClasspath>true</addClasspath> + </manifest> + </archive> + </configuration> + </plugin> + </plugins> + </pluginManagement> + + <plugins> + <plugin> + <artifactId>maven-assembly-plugin</artifactId> + <executions> + <execution> + <id>distribution-package</id> + <phase>package</phase> + <goals> + <goal>single</goal> + </goals> + <configuration> + <descriptors> + <descriptor>src/main/assembly/performance.xml</descriptor> + </descriptors> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + + </build> + +</project> diff --git a/java/perftests/distribution/src/main/assembly/performance.xml b/java/perftests/distribution/src/main/assembly/performance.xml new file mode 100644 index 0000000000..a564261a24 --- /dev/null +++ b/java/perftests/distribution/src/main/assembly/performance.xml @@ -0,0 +1,103 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. +--> +<assembly> + <id>performance-test-java</id> + <includeBaseDirectory>false</includeBaseDirectory> + <formats> + <format>tar.gz</format> + <format>zip</format> + </formats> + + <fileSets> + <!-- Apache Licensing --> + <fileSet> + <directory>../../resources</directory> + <outputDirectory>qpid-${qpid.version}</outputDirectory> + <includes> + <include>DISCLAIMER</include> + <include>LICENSE.txt</include> + <include>NOTICE.txt</include> + <include>README.txt</include> + </includes> + </fileSet> + + <fileSet> + <directory>../../release-docs</directory> + <outputDirectory>qpid-${qpid.version}/docs</outputDirectory> + <includes> + <include>RELEASE_NOTES.txt</include> + </includes> + </fileSet> + + <!-- Performance txt files--> + <fileSet> + <directory>..</directory> + <outputDirectory>qpid-${qpid.version}</outputDirectory> + <includes> + <include>*.txt</include> + </includes> + </fileSet> + + <!-- Execution Scripts --> + <fileSet> + <directory>../bin</directory> + <outputDirectory>qpid-${qpid.version}/bin</outputDirectory> + <includes> + <include>*</include> + </includes> + <fileMode>777</fileMode> + </fileSet> + + <!-- Provide Source in easy access location --> + <fileSet> + <directory>../src/main</directory> + <outputDirectory>qpid-${qpid.version}/src</outputDirectory> + <includes> + <include>**/*.java</include> + <include>**/*.log4j</include> + </includes> + </fileSet> + <fileSet> + <directory>../src/test</directory> + <outputDirectory>qpid-${qpid.version}/src</outputDirectory> + <includes> + <include>**/*.java</include> + </includes> + </fileSet> + + <!-- Metadata Jar --> + <fileSet> + <directory>target</directory> + <outputDirectory>qpid-${qpid.version}/lib</outputDirectory> + <includes> + <include>qpid-performance.jar</include> + </includes> + </fileSet> + </fileSets> + <dependencySets> + <dependencySet> + <outputDirectory>qpid-${qpid.version}/lib</outputDirectory> + <unpack>false</unpack> + <excludes> + <exclude>org.apache.qpid:qpid-perftests-distribution</exclude> + </excludes> + </dependencySet> + </dependencySets> +</assembly> diff --git a/java/perftests/jar-with-dependencies.xml b/java/perftests/jar-with-dependencies.xml new file mode 100644 index 0000000000..62978ee864 --- /dev/null +++ b/java/perftests/jar-with-dependencies.xml @@ -0,0 +1,29 @@ +<!-- This is an assembly descriptor that produces a jar file that contains all the
+ dependencies, fully expanded into a single jar, required to run the tests of
+ a maven project.
+ -->
+<assembly>
+ <id>all-test-deps</id>
+ <formats>
+ <format>jar</format>
+ </formats>
+ <includeBaseDirectory>false</includeBaseDirectory>
+ <dependencySets>
+ <dependencySet>
+ <outputDirectory></outputDirectory>
+ <outputFileNameMapping></outputFileNameMapping>
+ <unpack>true</unpack>
+ <scope>test</scope>
+ </dependencySet>
+ </dependencySets>
+ <fileSets>
+ <fileSet>
+ <directory>target/classes</directory>
+ <outputDirectory></outputDirectory>
+ </fileSet>
+ <fileSet>
+ <directory>target/test-classes</directory>
+ <outputDirectory></outputDirectory>
+ </fileSet>
+ </fileSets>
+</assembly>
diff --git a/java/perftests/pom.xml b/java/perftests/pom.xml index 3af906c4ac..758678fc84 100644 --- a/java/perftests/pom.xml +++ b/java/perftests/pom.xml @@ -19,6 +19,7 @@ <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + <modelVersion>4.0.0</modelVersion> <groupId>org.apache.qpid</groupId> <artifactId>qpid-perftests</artifactId> @@ -35,21 +36,219 @@ <properties> <topDirectoryLocation>..</topDirectoryLocation> + <log4j.perftests>perftests.log4j</log4j.perftests> </properties> + <!-- Temporary local maven repo, whilst JUnit Toolkit is still reaching stable version to add to central maven repository. --> + <repositories> + <repository> + <id>junit-toolkit.snapshots</id> + <name>JUnit Toolkit SNAPSHOT Repository</name> + <url>file://${basedir}/../mvn-repo</url> + <snapshots> + <enabled>true</enabled> + </snapshots> + </repository> + </repositories> + + <!-- Temporary local maven repo, whilst JUnit Toolkit is still reaching stable version to add to central maven repository. --> + <pluginRepositories> + <pluginRepository> + <id>junit-toolkit-plugin.snapshots</id> + <name>JUnit Toolkit SNAPSHOT Repository</name> + <url>file://${basedir}/../mvn-repo</url> + <snapshots> + <enabled>true</enabled> + </snapshots> + </pluginRepository> + </pluginRepositories> + <dependencies> + <dependency> <groupId>org.apache.qpid</groupId> <artifactId>qpid-client</artifactId> </dependency> + + <dependency> + <groupId>log4j</groupId> + <artifactId>log4j</artifactId> + </dependency> + + <dependency> + <groupId>uk.co.thebadgerset</groupId> + <artifactId>junit-toolkit</artifactId> + </dependency> + + <!-- Test dependencies. --> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + </dependency> + </dependencies> -<!-- <build> + <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-antrun-plugin</artifactId> </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-plugin</artifactId> + </plugin> + + <!-- The JUnit Toolkit maven2 plugin is in the process of being added to the maven repository. + + Configures the toolkit test runner for performance testing. These can be run from within maven, or by using the generated + scripts. + + To run from within maven: + + mvn uk.co.thebadgerset:junit-toolkit-maven-plugin:tktest + + To run from the command line (after doing assembly:assembly goal): + + java -cp target/test_jar-jar-with-dependencies.jar uk.co.thebadgerset.junit.extensions.TKTestRunner -s 1 -r 100000 + -o target org.apache.qpid.requestreply.PingPongTestPerf + + To generate the scripts do: + + mvn uk.co.thebadgerset:junit-toolkit-maven-plugin:tkscriptgen + + Then to run the scripts, in the target directory do (after doing assembly:assembly goal): + + ./script_name.sh + + These scripts can find everything in the 'all test dependencies' jar created by the assembly:assembly goal. + --> + <plugin> + <groupId>uk.co.thebadgerset</groupId> + <artifactId>junit-toolkit-maven-plugin</artifactId> + + <configuration> + <scriptOutDirectory>target</scriptOutDirectory> + <testJar>${project.build.finalName}-all-test-deps.jar</testJar> + + <systemproperties> + <property> + <name>log4j.configuration</name> + <value>${log4j.perftests}</value> + </property> + <property> + <name>amqj.logging.level</name> + <value>warn</value> + </property> + <property><!-- Turn off most logging messages from the junit-toolkit test tool itself. --> + <name>badger.level</name> + <value>warn</value> + </property> + <property> + <name>amqj.test.logging.level</name> + <value>info</value> + </property> + </systemproperties> + + <commands> + <!-- Single pings. These can be scaled up by overriding the parameters when calling the test script. --> + <Ping-Once>-n Ping-Once -s [1] -r 1 -t testPingOk -o . org.apache.qpid.ping.PingTestPerf</Ping-Once> + <Ping-Once-Async>-n Ping-Once-Async -s [1] -r 1 -t testAsyncPingOk -o . org.apache.qpid.ping.PingAsyncTestPerf</Ping-Once-Async> + <Ping-Latency>-n Ping-Latency -s [1000] -d 10S -t testPingLatency -o . org.apache.qpid.ping.PingLatencyTestPerf</Ping-Latency> + + <!-- More example Tests. These are examples to exercise all the features of the test harness. Can scale up with option overrides. --> + <Ping-Tx>-n Ping-Tx -s [100] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf transacted=true</Ping-Tx> + <Ping-Size>-n Ping-Size -s [100] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf messagesize=512</Ping-Size> + <Ping-Concurrent>-n Ping-Concurrent -s [100] -c [4] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf</Ping-Concurrent> + <Ping-Many-Queues> + -n Ping-Many-Queues -s [100] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf destinationcount=4 + </Ping-Many-Queues> + <Ping-Duration>-n Ping-Duration -s [100] -d10S -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf</Ping-Duration> + <Ping-Rate>-n Ping-Rate -s [100] -d10S -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf rate=500</Ping-Rate> + <Ping-PubSub>-n Ping-PubSub -s [100] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf pubsub=true</Ping-PubSub> + <Ping-Many-Topics> + -n Ping-Many-Topics -s [100] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf pubsub=true destinationcount=4 + </Ping-Many-Topics> + <Ping-Persistent> + -n Ping-Persistent -s [100] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=true + </Ping-Persistent> + <Ping-Batch-Logging> + -n Ping-Batch-Logging -s [100] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf BatchSize=10 + </Ping-Batch-Logging> + <Ping-Failover-Before-Send> + -n Ping-Failover-Before-Send -s [100] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf CommitBatchSize=10 FailBeforeSend=true + </Ping-Failover-Before-Send> + <Ping-Failover-After-Send> + -n Ping-Failover-After-Send -s [100] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf CommitBatchSize=10 FailAfterSend=true + </Ping-Failover-After-Send> + <Ping-Failover-Before-Commit> + -n Ping-Failover-Before-Commit -s [100] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf CommitBatchSize=10 FailBeforeCommit=true + </Ping-Failover-Before-Commit> + <Ping-Failover-After-Commit> + -n Ping-Failover-After-Commit -s [100] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf CommitBatchSize=10 FailAfterCommit=true + </Ping-Failover-After-Commit> + + </commands> + </configuration> + + <executions> + <execution> + <phase>test</phase> + <!--<goals> + <goal>tktest</goal> + </goals>--> + </execution> + </executions> + </plugin> + + <!-- Bundles all the dependencies, fully expanded into a single jar, required to run the tests. + + Usefull when bundling system, integration or performance tests into a convenient + package to hand over to testers. To use it run: + + java -cp target/your_app_name-all-test-deps.jar path.to.your.Class + + or often: + + java -cp target/your_app_name-all-test-deps.jar junit.framework.textui.TestRunner path.to.your.test.Class + + or other JUnit test runner invocations. + --> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-assembly-plugin</artifactId> + <version>2.2-SNAPSHOT</version> + <configuration> + <descriptors> + <descriptor>jar-with-dependencies.xml</descriptor> + </descriptors> + <outputDirectory>target</outputDirectory> + <workDirectory>target/assembly/work</workDirectory> + </configuration> + </plugin> + </plugins> - </build>--> + + <resources> + <!-- Include source files in built jar --> + <resource> + <targetPath>src/</targetPath> + <filtering>false</filtering> + <directory>src/main/java</directory> + <includes> + <include>**/*.java</include> + </includes> + </resource> + <!-- Include a log4j configuration in the jar at the root level (don't name this log4j.properties though as won't be able to override it). --> + <resource> + <targetPath>/</targetPath> + <filtering>false</filtering> + <directory>src/main/java</directory> + <includes> + <include>perftests.log4j</include> + </includes> + </resource> + </resources> + </build> + </project> diff --git a/java/perftests/src/main/java/org/apache/qpid/client/message/TestMessageFactory.java b/java/perftests/src/main/java/org/apache/qpid/client/message/TestMessageFactory.java new file mode 100644 index 0000000000..c0f236b833 --- /dev/null +++ b/java/perftests/src/main/java/org/apache/qpid/client/message/TestMessageFactory.java @@ -0,0 +1,118 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.client.message; + +import org.apache.mina.common.ByteBuffer; +import org.apache.mina.common.SimpleByteBufferAllocator; + +import javax.jms.JMSException; +import javax.jms.Session; +import javax.jms.ObjectMessage; +import javax.jms.StreamMessage; +import javax.jms.BytesMessage; +import javax.jms.TextMessage; +import javax.jms.Queue; +import javax.jms.DeliveryMode; +import javax.jms.Destination; + +public class TestMessageFactory +{ + private static final String MESSAGE_DATA_BYTES = "-message payload-message paylaod-message payload-message paylaod"; + + public static TextMessage newTextMessage(Session session, int size) throws JMSException + { + return session.createTextMessage(createMessagePayload(size)); + } + + public static JMSTextMessage newJMSTextMessage(int size, String encoding) throws JMSException + { + ByteBuffer byteBuffer = (new SimpleByteBufferAllocator()).allocate(size, true); + JMSTextMessage message = new JMSTextMessage(byteBuffer, encoding); + message.clearBody(); + message.setText(createMessagePayload(size)); + return message; + } + + public static BytesMessage newBytesMessage(Session session, int size) throws JMSException + { + BytesMessage message = session.createBytesMessage(); + message.writeUTF(createMessagePayload(size)); + return message; + } + + public static StreamMessage newStreamMessage(Session session, int size) throws JMSException + { + StreamMessage message = session.createStreamMessage(); + message.writeString(createMessagePayload(size)); + return message; + } + + public static ObjectMessage newObjectMessage(Session session, int size) throws JMSException + { + if (size == 0) + { + return session.createObjectMessage(); + } + else + { + return session.createObjectMessage(createMessagePayload(size)); + } + } + + /** + * Creates an ObjectMessage with given size and sets the JMS properties (JMSReplyTo and DeliveryMode) + * @param session + * @param replyDestination + * @param size + * @param persistent + * @return the new ObjectMessage + * @throws JMSException + */ + public static ObjectMessage newObjectMessage(Session session, Destination replyDestination, int size, boolean persistent) throws JMSException + { + ObjectMessage msg = newObjectMessage(session, size); + + // Set the messages persistent delivery flag. + msg.setJMSDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT); + + // Ensure that the temporary reply queue is set as the reply to destination for the message. + if (replyDestination != null) + { + msg.setJMSReplyTo(replyDestination); + } + + return msg; + } + + public static String createMessagePayload(int size) + { + StringBuffer buf = new StringBuffer(size); + int count = 0; + while (count < size) + { + buf.append(MESSAGE_DATA_BYTES); + count += MESSAGE_DATA_BYTES.length(); + } + if (count < size) + { + buf.append(MESSAGE_DATA_BYTES, 0, size - count); + } + + return buf.toString(); + } +} diff --git a/java/perftests/src/main/java/org/apache/qpid/ping/PingClient.java b/java/perftests/src/main/java/org/apache/qpid/ping/PingClient.java new file mode 100644 index 0000000000..7fd91ca39d --- /dev/null +++ b/java/perftests/src/main/java/org/apache/qpid/ping/PingClient.java @@ -0,0 +1,90 @@ +/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.ping;
+
+import java.util.List;
+
+import javax.jms.Destination;
+
+import org.apache.qpid.requestreply.PingPongProducer;
+
+/**
+ * PingClient is a {@link PingPongProducer} that does not need a {@link org.apache.qpid.requestreply.PingPongBouncer}
+ * to send replies to its pings. It simply listens to its own ping destinations, rather than seperate reply queues.
+ * It is an all in one ping client, that produces and consumes its own pings.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Create a ping pong producer that listens to its own pings <td> {@link PingPongProducer}
+ * </table>
+ */
+public class PingClient extends PingPongProducer
+{
+ /**
+ * Creates a ping producer with the specified parameters, of which there are many. See their individual comments
+ * for details. This constructor creates ping pong producer but de-registers its reply-to destination message
+ * listener, and replaces it by listening to all of its ping destinations.
+ *
+ * @param brokerDetails The URL of the broker to send pings to.
+ * @param username The username to log onto the broker with.
+ * @param password The password to log onto the broker with.
+ * @param virtualpath The virtual host name to use on the broker.
+ * @param destinationName The name (or root where multiple destinations are used) of the desitination to send
+ * pings to.
+ * @param selector The selector to filter replies with.
+ * @param transacted Indicates whether or not pings are sent and received in transactions.
+ * @param persistent Indicates whether pings are sent using peristent delivery.
+ * @param messageSize Specifies the size of ping messages to send.
+ * @param verbose Indicates that information should be printed to the console on every ping.
+ * @param afterCommit Indicates that the user should be promted to terminate a broker after commits to test failover.
+ * @param beforeCommit Indicates that the user should be promted to terminate a broker before commits to test failover.
+ * @param afterSend Indicates that the user should be promted to terminate a broker after sends to test failover.
+ * @param beforeSend Indicates that the user should be promted to terminate a broker before sends to test failover.
+ * @param failOnce Indicates that the failover testing behaviour should only happen on the first commit, not all.
+ * @param txBatchSize Specifies the number of pings to send in each transaction.
+ * @param noOfDestinations The number of destinations to ping. Must be 1 or more.
+ * @param rate Specified the number of pings per second to send. Setting this to 0 means send as fast as
+ * possible, with no rate restriction.
+ * @param pubsub
+ *
+ * @throws Exception Any exceptions are allowed to fall through.
+ */
+ public PingClient(String brokerDetails, String username, String password, String virtualpath, String destinationName,
+ String selector, boolean transacted, boolean persistent, int messageSize, boolean verbose,
+ boolean afterCommit, boolean beforeCommit, boolean afterSend, boolean beforeSend, boolean failOnce,
+ int txBatchSize, int noOfDestinations, int rate, boolean pubsub) throws Exception
+ {
+ super(brokerDetails, username, password, virtualpath, destinationName, selector, transacted, persistent, messageSize,
+ verbose, afterCommit, beforeCommit, afterSend, beforeSend, failOnce, txBatchSize, noOfDestinations, rate,
+ pubsub);
+ }
+
+ /**
+ * Returns the ping destinations themselves as the reply destinations for this pinger to listen to. This has the
+ * effect of making this pinger listen to its own pings.
+ *
+ * @return The ping destinations.
+ */
+ public List<Destination> getReplyDestinations()
+ {
+ return _pingDestinations;
+ }
+}
diff --git a/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongBouncer.java b/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongBouncer.java new file mode 100644 index 0000000000..87edd31575 --- /dev/null +++ b/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongBouncer.java @@ -0,0 +1,452 @@ +/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.requestreply;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+
+import javax.jms.*;
+
+import org.apache.log4j.Logger;
+
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQTopic;
+import org.apache.qpid.jms.ConnectionListener;
+import org.apache.qpid.jms.Session;
+import org.apache.qpid.topic.Config;
+
+/**
+ * PingPongBouncer is a message listener the bounces back messages to their reply to destination. This is used to return
+ * ping messages generated by {@link org.apache.qpid.requestreply.PingPongProducer} but could be used for other purposes
+ * too.
+ *
+ * <p/>The correlation id from the received message is extracted, and placed into the reply as the correlation id. Messages
+ * are bounced back to their reply-to destination. The original sender of the message has the option to use either a unique
+ * temporary queue or the correlation id to correlate the original message to the reply.
+ *
+ * <p/>There is a verbose mode flag which causes information about each ping to be output to the console
+ * (info level logging, so usually console). This can be helpfull to check the bounce backs are happening but should
+ * be disabled for real timing tests as writing to the console will slow things down.
+ *
+ * <p><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Bounce back messages to their reply to destination.
+ * <tr><td> Provide command line invocation to start the bounce back on a configurable broker url.
+ * </table>
+ *
+ * @todo Replace the command line parsing with a neater tool.
+ *
+ * @todo Make verbose accept a number of messages, only prints to console every X messages.
+ */
+public class PingPongBouncer implements MessageListener
+{
+ private static final Logger _logger = Logger.getLogger(PingPongBouncer.class);
+
+ /** The default prefetch size for the message consumer. */
+ private static final int PREFETCH = 1;
+
+ /** The default no local flag for the message consumer. */
+ private static final boolean NO_LOCAL = true;
+
+ private static final String DEFAULT_DESTINATION_NAME = "ping";
+
+ /** The default exclusive flag for the message consumer. */
+ private static final boolean EXCLUSIVE = false;
+
+ /** A convenient formatter to use when time stamping output. */
+ protected static final SimpleDateFormat timestampFormatter = new SimpleDateFormat("hh:mm:ss:SS");
+
+ /** Used to indicate that the reply generator should log timing info to the console (logger info level). */
+ private boolean _verbose = false;
+
+ /** Determines whether this bounce back client bounces back messages persistently. */
+ private boolean _persistent = false;
+
+ private Destination _consumerDestination;
+
+ /** Keeps track of the response destination of the previous message for the last reply to producer cache. */
+ private Destination _lastResponseDest;
+
+ /** The producer for sending replies with. */
+ private MessageProducer _replyProducer;
+
+ /** The consumer session. */
+ private Session _consumerSession;
+
+ /** The producer session. */
+ private Session _producerSession;
+
+ /** Holds the connection to the broker. */
+ private AMQConnection _connection;
+
+ /** Flag used to indicate if this is a point to point or pub/sub ping client. */
+ private boolean _isPubSub = false;
+
+ /**
+ * This flag is used to indicate that the user should be prompted to kill a broker, in order to test
+ * failover, immediately before committing a transaction.
+ */
+ protected boolean _failBeforeCommit = false;
+
+ /**
+ * This flag is used to indicate that the user should be prompted to a kill a broker, in order to test
+ * failover, immediate after committing a transaction.
+ */
+ protected boolean _failAfterCommit = false;
+
+ /**
+ * Creates a PingPongBouncer on the specified producer and consumer sessions.
+ *
+ * @param brokerDetails The addresses of the brokers to connect to.
+ * @param username The broker username.
+ * @param password The broker password.
+ * @param virtualpath The virtual host name within the broker.
+ * @param destinationName The name of the queue to receive pings on
+ * (or root of the queue name where many queues are generated).
+ * @param persistent A flag to indicate that persistent message should be used.
+ * @param transacted A flag to indicate that pings should be sent within transactions.
+ * @param selector A message selector to filter received pings with.
+ * @param verbose A flag to indicate that message timings should be sent to the console.
+ *
+ * @throws Exception All underlying exceptions allowed to fall through. This is only test code...
+ */
+ public PingPongBouncer(String brokerDetails, String username, String password, String virtualpath,
+ String destinationName, boolean persistent, boolean transacted, String selector, boolean verbose,
+ boolean pubsub) throws Exception
+ {
+ // Create a client id to uniquely identify this client.
+ InetAddress address = InetAddress.getLocalHost();
+ String clientId = address.getHostName() + System.currentTimeMillis();
+ _verbose = verbose;
+ _persistent = persistent;
+ setPubSub(pubsub);
+ // Connect to the broker.
+ setConnection(new AMQConnection(brokerDetails, username, password, clientId, virtualpath));
+ _logger.info("Connected with URL:" + getConnection().toURL());
+
+ // Set up the failover notifier.
+ getConnection().setConnectionListener(new FailoverNotifier());
+
+ // Create a session to listen for messages on and one to send replies on, transactional depending on the
+ // command line option.
+ _consumerSession = (Session) getConnection().createSession(transacted, Session.AUTO_ACKNOWLEDGE);
+ _producerSession = (Session) getConnection().createSession(transacted, Session.AUTO_ACKNOWLEDGE);
+
+ // Create the queue to listen for message on.
+ createConsumerDestination(destinationName);
+ MessageConsumer consumer =
+ _consumerSession.createConsumer(_consumerDestination, PREFETCH, NO_LOCAL, EXCLUSIVE, selector);
+
+ // Create a producer for the replies, without a default destination.
+ _replyProducer = _producerSession.createProducer(null);
+ _replyProducer.setDisableMessageTimestamp(true);
+ _replyProducer.setDeliveryMode(_persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+
+ // Set this up to listen for messages on the queue.
+ consumer.setMessageListener(this);
+ }
+
+ /**
+ * Starts a stand alone ping-pong client running in verbose mode.
+ *
+ * @param args
+ */
+ public static void main(String[] args) throws Exception
+ {
+ System.out.println("Starting...");
+
+ // Display help on the command line.
+ if (args.length == 0)
+ {
+ _logger.info("Running test with default values...");
+ //usage();
+ //System.exit(0);
+ }
+
+ // Extract all command line parameters.
+ Config config = new Config();
+ config.setOptions(args);
+ String brokerDetails = config.getHost() + ":" + config.getPort();
+ String virtualpath = "/test";
+ String destinationName = config.getDestination();
+ if (destinationName == null)
+ {
+ destinationName = DEFAULT_DESTINATION_NAME;
+ }
+
+ String selector = config.getSelector();
+ boolean transacted = config.isTransacted();
+ boolean persistent = config.usePersistentMessages();
+ boolean pubsub = config.isPubSub();
+ boolean verbose = true;
+
+ //String selector = null;
+
+ // Instantiate the ping pong client with the command line options and start it running.
+ PingPongBouncer pingBouncer =
+ new PingPongBouncer(brokerDetails, "guest", "guest", virtualpath, destinationName, persistent, transacted,
+ selector, verbose, pubsub);
+ pingBouncer.getConnection().start();
+
+ System.out.println("Waiting...");
+ }
+
+ private static void usage()
+ {
+ System.err.println("Usage: PingPongBouncer \n" + "-host : broker host\n" + "-port : broker port\n"
+ + "-destinationname : queue/topic name\n" + "-transacted : (true/false). Default is false\n"
+ + "-persistent : (true/false). Default is false\n"
+ + "-pubsub : (true/false). Default is false\n" + "-selector : selector string\n");
+ }
+
+ /**
+ * This is a callback method that is notified of all messages for which this has been registered as a message
+ * listener on a message consumer. It sends a reply (pong) to all messages it receieves on the reply to
+ * destination of the message.
+ *
+ * @param message The message that triggered this callback.
+ */
+ public void onMessage(Message message)
+ {
+ try
+ {
+ String messageCorrelationId = message.getJMSCorrelationID();
+ if (_verbose)
+ {
+ _logger.info(timestampFormatter.format(new Date()) + ": Got ping with correlation id, "
+ + messageCorrelationId);
+ }
+
+ // Get the reply to destination from the message and check it is set.
+ Destination responseDest = message.getJMSReplyTo();
+
+ if (responseDest == null)
+ {
+ _logger.debug("Cannot send reply because reply-to destination is null.");
+
+ return;
+ }
+
+ // Spew out some timing information if verbose mode is on.
+ if (_verbose)
+ {
+ Long timestamp = message.getLongProperty("timestamp");
+
+ if (timestamp != null)
+ {
+ long diff = System.currentTimeMillis() - timestamp;
+ _logger.info("Time to bounce point: " + diff);
+ }
+ }
+
+ // Correlate the reply to the original.
+ message.setJMSCorrelationID(messageCorrelationId);
+
+ // Send the receieved message as the pong reply.
+ _replyProducer.send(responseDest, message);
+
+ if (_verbose)
+ {
+ _logger.info(timestampFormatter.format(new Date()) + ": Sent reply with correlation id, "
+ + messageCorrelationId);
+ }
+
+ // Commit the transaction if running in transactional mode.
+ commitTx(_producerSession);
+ }
+ catch (JMSException e)
+ {
+ _logger.debug("There was a JMSException: " + e.getMessage(), e);
+ }
+ }
+
+ /**
+ * Gets the underlying connection that this ping client is running on.
+ *
+ * @return The underlying connection that this ping client is running on.
+ */
+ public AMQConnection getConnection()
+ {
+ return _connection;
+ }
+
+ /**
+ * Sets the connection that this ping client is using.
+ *
+ * @param connection The ping connection.
+ */
+ public void setConnection(AMQConnection connection)
+ {
+ this._connection = connection;
+ }
+
+ /**
+ * Sets or clears the pub/sub flag to indiciate whether this client is pinging a queue or a topic.
+ *
+ * @param pubsub <tt>true</tt> if this client is pinging a topic, <tt>false</tt> if it is pinging a queue.
+ */
+ public void setPubSub(boolean pubsub)
+ {
+ _isPubSub = pubsub;
+ }
+
+ /**
+ * Checks whether this client is a p2p or pub/sub ping client.
+ *
+ * @return <tt>true</tt> if this client is pinging a topic, <tt>false</tt> if it is pinging a queue.
+ */
+ public boolean isPubSub()
+ {
+ return _isPubSub;
+ }
+
+ /**
+ * Convenience method to commit the transaction on the specified session. If the session to commit on is not
+ * a transactional session, this method does nothing.
+ *
+ * <p/>If the {@link #_failBeforeCommit} flag is set, this will prompt the user to kill the broker before the
+ * commit is applied. If the {@link #_failAfterCommit} flag is set, this will prompt the user to kill the broker
+ * after the commit is applied.
+ *
+ * @throws javax.jms.JMSException If the commit fails and then the rollback fails.
+ */
+ protected void commitTx(Session session) throws JMSException
+ {
+ if (session.getTransacted())
+ {
+ try
+ {
+ if (_failBeforeCommit)
+ {
+ _logger.trace("Failing Before Commit");
+ doFailover();
+ }
+
+ session.commit();
+
+ if (_failAfterCommit)
+ {
+ _logger.trace("Failing After Commit");
+ doFailover();
+ }
+
+ _logger.trace("Session Commited.");
+ }
+ catch (JMSException e)
+ {
+ _logger.trace("JMSException on commit:" + e.getMessage(), e);
+
+ try
+ {
+ session.rollback();
+ _logger.debug("Message rolled back.");
+ }
+ catch (JMSException jmse)
+ {
+ _logger.trace("JMSE on rollback:" + jmse.getMessage(), jmse);
+
+ // Both commit and rollback failed. Throw the rollback exception.
+ throw jmse;
+ }
+ }
+ }
+ }
+
+ /**
+ * Prompts the user to terminate the named broker, in order to test failover functionality. This method will block
+ * until the user supplied some input on the terminal.
+ *
+ * @param broker The name of the broker to terminate.
+ */
+ protected void doFailover(String broker)
+ {
+ System.out.println("Kill Broker " + broker + " now.");
+ try
+ {
+ System.in.read();
+ }
+ catch (IOException e)
+ { }
+
+ System.out.println("Continuing.");
+ }
+
+ /**
+ * Prompts the user to terminate the broker, in order to test failover functionality. This method will block
+ * until the user supplied some input on the terminal.
+ */
+ protected void doFailover()
+ {
+ System.out.println("Kill Broker now.");
+ try
+ {
+ System.in.read();
+ }
+ catch (IOException e)
+ { }
+
+ System.out.println("Continuing.");
+
+ }
+
+ private void createConsumerDestination(String name)
+ {
+ if (isPubSub())
+ {
+ _consumerDestination = new AMQTopic(name);
+ }
+ else
+ {
+ _consumerDestination = new AMQQueue(name);
+ }
+ }
+
+ /**
+ * A connection listener that logs out any failover complete events. Could do more interesting things with this
+ * at some point...
+ */
+ public static class FailoverNotifier implements ConnectionListener
+ {
+ public void bytesSent(long count)
+ { }
+
+ public void bytesReceived(long count)
+ { }
+
+ public boolean preFailover(boolean redirect)
+ {
+ return true;
+ }
+
+ public boolean preResubscribe()
+ {
+ return true;
+ }
+
+ public void failoverComplete()
+ {
+ _logger.info("App got failover complete callback.");
+ }
+ }
+}
diff --git a/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java b/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java new file mode 100644 index 0000000000..885277c533 --- /dev/null +++ b/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java @@ -0,0 +1,1127 @@ +/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.requestreply;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.*;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.jms.*;
+
+import org.apache.log4j.Logger;
+
+import org.apache.qpid.client.*;
+import org.apache.qpid.client.message.TestMessageFactory;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.jms.MessageProducer;
+import org.apache.qpid.jms.Session;
+import org.apache.qpid.topic.Config;
+
+import uk.co.thebadgerset.junit.extensions.BatchedThrottle;
+import uk.co.thebadgerset.junit.extensions.Throttle;
+
+/**
+ * PingPongProducer is a client that sends pings to a queue and waits for pongs to be bounced back by a bounce back
+ * client (see {@link PingPongBouncer} for the bounce back client).
+ *
+ * <p/>The pings are sent with a reply-to field set to a single temporary queue, which is the same for all pings.
+ * This means that this class has to do some work to correlate pings with pongs; it expectes the original message
+ * correlation id in the ping to be bounced back in the reply correlation id.
+ *
+ * <p/>This ping tool accepts a vast number of configuration options, all of which are passed in to the constructor.
+ * It can ping topics or queues; ping multiple destinations; do persistent pings; send messages of any size; do pings
+ * within transactions; control the number of pings to send in each transaction; limit its sending rate; and perform
+ * failover testing.
+ *
+ * <p/>This implements the Runnable interface with a run method that implements an infinite ping loop. The ping loop
+ * does all its work through helper methods, so that code wishing to run a ping-pong cycle is not forced to do so
+ * by starting a new thread. The command line invocation does take advantage of this ping loop. A shutdown hook is
+ * also registered to terminate the ping-pong loop cleanly.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Provide a ping and wait for all responses cycle.
+ * <tr><td> Provide command line invocation to loop the ping cycle on a configurable broker url.
+ * </table>
+ *
+ * @todo The use of a ping rate {@link #DEFAULT_RATE} and waits between pings {@link #DEFAULT_SLEEP_TIME} are overlapping.
+ * Use the rate and throttling only.
+ *
+ * @todo Make shared or unique destinations a configurable option, hard coded to false.
+ */
+public class PingPongProducer implements Runnable, MessageListener, ExceptionListener
+{
+ private static final Logger _logger = Logger.getLogger(PingPongProducer.class);
+
+ /** Holds the name of the property to get the test message size from. */
+ public static final String MESSAGE_SIZE_PROPNAME = "messagesize";
+
+ /** Holds the name of the property to get the ping queue name from. */
+ public static final String PING_QUEUE_NAME_PROPNAME = "destinationname";
+
+ /** Holds the name of the property to get the test delivery mode from. */
+ public static final String PERSISTENT_MODE_PROPNAME = "persistent";
+
+ /** Holds the name of the property to get the test transactional mode from. */
+ public static final String TRANSACTED_PROPNAME = "transacted";
+
+ /** Holds the name of the property to get the test broker url from. */
+ public static final String BROKER_PROPNAME = "broker";
+
+ /** Holds the name of the property to get the test broker virtual path. */
+ public static final String VIRTUAL_PATH_PROPNAME = "virtualPath";
+
+ /** Holds the name of the property to get the message rate from. */
+ public static final String RATE_PROPNAME = "rate";
+
+ public static final String VERBOSE_OUTPUT_PROPNAME = "verbose";
+
+ /** Holds the true or false depending on wether it is P2P test or PubSub */
+ public static final String IS_PUBSUB_PROPNAME = "pubsub";
+
+ public static final String FAIL_AFTER_COMMIT_PROPNAME = "FailAfterCommit";
+
+ public static final String FAIL_BEFORE_COMMIT_PROPNAME = "FailBeforeCommit";
+
+ public static final String FAIL_AFTER_SEND_PROPNAME = "FailAfterSend";
+
+ public static final String FAIL_BEFORE_SEND_PROPNAME = "FailBeforeSend";
+
+ public static final String FAIL_ONCE_PROPNAME = "FailOnce";
+
+ public static final String USERNAME_PROPNAME = "username";
+
+ public static final String PASSWORD_PROPNAME = "password";
+
+ public static final String SELECTOR_PROPNAME = "selector";
+
+ public static final String PING_DESTINATION_COUNT_PROPNAME = "destinationscount";
+
+ /** Holds the name of the property to get the waiting timeout for response messages. */
+ public static final String TIMEOUT_PROPNAME = "timeout";
+
+ public static final String COMMIT_BATCH_SIZE_PROPNAME = "CommitBatchSize";
+
+ /** Used to set up a default message size. */
+ public static final int DEFAULT_MESSAGE_SIZE = 0;
+
+ /** Holds the name of the default destination to send pings on. */
+ public static final String DEFAULT_PING_DESTINATION_NAME = "ping";
+
+ /** Defines the default number of destinations to ping. */
+ public static final int DEFAULT_DESTINATION_COUNT = 1;
+
+ /** Defines the default rate (in pings per second) to send pings at. 0 means as fast as possible, no restriction. */
+ public static final int DEFAULT_RATE = 0;
+
+ /** Defines the default wait between pings. */
+ public static final long DEFAULT_SLEEP_TIME = 250;
+
+ /** Default time to wait before assuming that a ping has timed out. */
+ public static final long DEFAULT_TIMEOUT = 9000;
+
+ /** Defines the default number of pings to send in each transaction when running transactionally. */
+ public static final int DEFAULT_TX_BATCH_SIZE = 100;
+
+ /** Defines the default prefetch size to use when consuming messages. */
+ public static final int DEFAULT_PREFETCH = 100;
+
+ /** Defines the default value of the no local flag to use when consuming messages. */
+ public static final boolean DEFAULT_NO_LOCAL = false;
+
+ /** Defines the default value of the exclusive flag to use when consuming messages. */
+ public static final boolean DEFAULT_EXCLUSIVE = false;
+
+ /** Holds the message delivery mode to use for the test. */
+ public static final boolean DEFAULT_PERSISTENT_MODE = false;
+
+ /** Holds the transactional mode to use for the test. */
+ public static final boolean DEFAULT_TRANSACTED = false;
+
+ /** Holds the default broker url for the test. */
+ public static final String DEFAULT_BROKER = "tcp://localhost:5672";
+
+ /** Holds the default virtual path for the test. */
+ public static final String DEFAULT_VIRTUAL_PATH = "test";
+
+ /** Holds the pub/sub mode default, true means ping a topic, false means ping a queue. */
+ public static final boolean DEFAULT_PUBSUB = false;
+
+ /** Holds the default broker log on username. */
+ public static final String DEFAULT_USERNAME = "guest";
+
+ /** Holds the default broker log on password. */
+ public static final String DEFAULT_PASSWORD = "guest";
+
+ /** Holds the default message selector. */
+ public static final String DEFAULT_SELECTOR = null;
+
+ /** Holds the default failover after commit test flag. */
+ public static final String DEFAULT_FAIL_AFTER_COMMIT = "false";
+
+ /** Holds the default failover before commit test flag. */
+ public static final String DEFAULT_FAIL_BEFORE_COMMIT = "false";
+
+ /** Holds the default failover after send test flag. */
+ public static final String DEFAULT_FAIL_AFTER_SEND = "false";
+
+ /** Holds the default failover before send test flag. */
+ public static final String DEFAULT_FAIL_BEFORE_SEND = "false";
+
+ /** Holds the default failover only once flag, true means only do one failover, false means failover on every commit cycle. */
+ public static final String DEFAULT_FAIL_ONCE = "true";
+
+ /** Holds the default verbose mode. */
+ public static final boolean DEFAULT_VERBOSE = false;
+
+ /** Holds the name of the property to store nanosecond timestamps in ping messages with. */
+ public static final String MESSAGE_TIMESTAMP_PROPNAME = "timestamp";
+
+ /** A source for providing sequential unique correlation ids. These will be unique within the same JVM. */
+ private static AtomicLong idGenerator = new AtomicLong(0L);
+
+ /**
+ * Holds a map from message ids to latches on which threads wait for replies. This map is shared accross
+ * multiple ping producers on the same JVM.
+ */
+ private static Map<String, CountDownLatch> trafficLights =
+ Collections.synchronizedMap(new HashMap<String, CountDownLatch>());
+
+ /** A convenient formatter to use when time stamping output. */
+ protected static final DateFormat timestampFormatter = new SimpleDateFormat("hh:mm:ss:SS");
+
+ /**
+ * This id generator is used to generate ids to append to the queue name to ensure that queues can be unique when
+ * creating multiple ping producers in the same JVM.
+ */
+ protected static AtomicInteger _queueJVMSequenceID = new AtomicInteger();
+
+ /** Destination where the response messages will arrive. */
+ private Destination _replyDestination;
+
+ /** Destination where the producer will be sending message to. */
+ //private Destination _pingDestination;
+
+ /** Determines whether this producer sends persistent messages. */
+ protected boolean _persistent;
+
+ /** Determines what size of messages this producer sends. */
+ protected int _messageSize;
+
+ /** Used to indicate that the ping loop should print out whenever it pings. */
+ protected boolean _verbose = false;
+
+ /** Holds the session on which ping replies are received. */
+ protected Session _consumerSession;
+
+ /** Used to restrict the sending rate to a specified limit. */
+ private Throttle _rateLimiter = null;
+
+ /** Holds a message listener that this message listener chains all its messages to. */
+ private ChainedMessageListener _chainedMessageListener = null;
+
+ /** Flag used to indicate if this is a point to point or pub/sub ping client. */
+ protected boolean _isPubSub = false;
+
+ /**
+ * This id generator is used to generates ids that are only unique within this pinger. Creating multiple pingers
+ * on the same JVM using this id generator will allow them to ping on the same queues.
+ */
+ protected AtomicInteger _queueSharedId = new AtomicInteger();
+
+ /** Used to tell the ping loop when to terminate, it only runs while this is true. */
+ protected boolean _publish = true;
+
+ /** Holds the connection to the broker. */
+ private Connection _connection;
+
+ /** Holds the producer session, needed to create ping messages. */
+ private Session _producerSession;
+
+ /** Holds the set of destiniations that this ping producer pings. */
+ protected List<Destination> _pingDestinations = new ArrayList<Destination>();
+
+ /** Holds the message producer to send the pings through. */
+ protected MessageProducer _producer;
+
+ /** Flag used to indicate that the user should be prompted to terminate a broker, to test failover before a commit. */
+ protected boolean _failBeforeCommit = false;
+
+ /** Flag used to indicate that the user should be prompted to terminate a broker, to test failover after a commit. */
+ protected boolean _failAfterCommit = false;
+
+ /** Flag used to indicate that the user should be prompted to terminate a broker, to test failover before a send. */
+ protected boolean _failBeforeSend = false;
+
+ /** Flag used to indicate that the user should be prompted to terminate a broker, to test failover after a send. */
+ protected boolean _failAfterSend = false;
+
+ /** Flag used to indicate that failover prompting should only be done on the first commit, not on every commit. */
+ protected boolean _failOnce = true;
+
+ /** Holds the number of sends that should be performed in every transaction when using transactions. */
+ protected int _txBatchSize = 1;
+
+ /**
+ * Creates a ping producer with the specified parameters, of which there are many. See their individual comments
+ * for details. This constructor creates a connection to the broker and creates producer and consumer sessions on it,
+ * to send and recieve its pings and replies on. The other options are kept, and control how this pinger behaves.
+ *
+ * @param brokerDetails The URL of the broker to send pings to.
+ * @param username The username to log onto the broker with.
+ * @param password The password to log onto the broker with.
+ * @param virtualpath The virtual host name to use on the broker.
+ * @param destinationName The name (or root where multiple destinations are used) of the desitination to send
+ * pings to.
+ * @param selector The selector to filter replies with.
+ * @param transacted Indicates whether or not pings are sent and received in transactions.
+ * @param persistent Indicates whether pings are sent using peristent delivery.
+ * @param messageSize Specifies the size of ping messages to send.
+ * @param verbose Indicates that information should be printed to the console on every ping.
+ * @param afterCommit Indicates that the user should be promted to terminate a broker after commits to test failover.
+ * @param beforeCommit Indicates that the user should be promted to terminate a broker before commits to test failover.
+ * @param afterSend Indicates that the user should be promted to terminate a broker after sends to test failover.
+ * @param beforeSend Indicates that the user should be promted to terminate a broker before sends to test failover.
+ * @param failOnce Indicates that the failover testing behaviour should only happen on the first commit, not all.
+ * @param txBatchSize Specifies the number of pings to send in each transaction.
+ * @param noOfDestinations The number of destinations to ping. Must be 1 or more.
+ * @param rate Specified the number of pings per second to send. Setting this to 0 means send as fast as
+ * possible, with no rate restriction.
+ * @param pubsub
+ *
+ * @throws Exception Any exceptions are allowed to fall through.
+ */
+ public PingPongProducer(String brokerDetails, String username, String password, String virtualpath,
+ String destinationName, String selector, boolean transacted, boolean persistent, int messageSize,
+ boolean verbose, boolean afterCommit, boolean beforeCommit, boolean afterSend,
+ boolean beforeSend, boolean failOnce, int txBatchSize, int noOfDestinations, int rate,
+ boolean pubsub) throws Exception
+ {
+ // Check that one or more destinations were specified.
+ if (noOfDestinations < 1)
+ {
+ throw new IllegalArgumentException("There must be at least one destination.");
+ }
+
+ // Create a connection to the broker.
+ InetAddress address = InetAddress.getLocalHost();
+ String clientID = address.getHostName() + System.currentTimeMillis();
+
+ _connection = new AMQConnection(brokerDetails, username, password, clientID, virtualpath);
+
+ // Create transactional or non-transactional sessions, based on the command line arguments.
+ _producerSession = (Session) getConnection().createSession(transacted, Session.AUTO_ACKNOWLEDGE);
+ _consumerSession = (Session) getConnection().createSession(transacted, Session.AUTO_ACKNOWLEDGE);
+
+ // Set up a throttle to control the send rate, if a rate > 0 is specified.
+ if (rate > 0)
+ {
+ _rateLimiter = new BatchedThrottle();
+ _rateLimiter.setRate(rate);
+ }
+
+ // Create the temporary queue for replies.
+ _replyDestination = _consumerSession.createTemporaryQueue();
+
+ // Create the producer and the consumers for all reply destinations.
+ createProducer();
+ createPingDestinations(noOfDestinations, selector, destinationName, true);
+ createReplyConsumers(getReplyDestinations(), selector);
+
+ // Keep all the remaining options.
+ _persistent = persistent;
+ _messageSize = messageSize;
+ _verbose = verbose;
+ _failAfterCommit = afterCommit;
+ _failBeforeCommit = beforeCommit;
+ _failAfterSend = afterSend;
+ _failBeforeSend = beforeSend;
+ _failOnce = failOnce;
+ _txBatchSize = txBatchSize;
+ _isPubSub = pubsub;
+ }
+
+ /**
+ * Starts a ping-pong loop running from the command line. The bounce back client {@link PingPongBouncer} also needs
+ * to be started to bounce the pings back again.
+ *
+ * @param args The command line arguments.
+ */
+ public static void main(String[] args) throws Exception
+ {
+ // Extract the command line.
+ Config config = new Config();
+ config.setOptions(args);
+ if (args.length == 0)
+ {
+ _logger.info("Running test with default values...");
+ //usage();
+ //System.exit(0);
+ }
+
+ String brokerDetails = config.getHost() + ":" + config.getPort();
+ String virtualpath = "/test";
+ String selector = (config.getSelector() == null) ? DEFAULT_SELECTOR : config.getSelector();
+ boolean verbose = true;
+ boolean transacted = config.isTransacted();
+ boolean persistent = config.usePersistentMessages();
+ int messageSize = (config.getPayload() != 0) ? config.getPayload() : DEFAULT_MESSAGE_SIZE;
+ //int messageCount = config.getMessages();
+ int destCount = (config.getDestinationsCount() != 0) ? config.getDestinationsCount() : DEFAULT_DESTINATION_COUNT;
+ int batchSize = (config.getBatchSize() != 0) ? config.getBatchSize() : DEFAULT_TX_BATCH_SIZE;
+ int rate = (config.getRate() != 0) ? config.getRate() : DEFAULT_RATE;
+ boolean pubsub = config.isPubSub();
+
+ String destName = config.getDestination();
+ if (destName == null)
+ {
+ destName = DEFAULT_PING_DESTINATION_NAME;
+ }
+
+ boolean afterCommit = false;
+ boolean beforeCommit = false;
+ boolean afterSend = false;
+ boolean beforeSend = false;
+ boolean failOnce = false;
+
+ for (String arg : args)
+ {
+ if (arg.startsWith("failover:"))
+ {
+ //failover:<before|after>:<send:commit> | failover:once
+ String[] parts = arg.split(":");
+ if (parts.length == 3)
+ {
+ if (parts[2].equals("commit"))
+ {
+ afterCommit = parts[1].equals("after");
+ beforeCommit = parts[1].equals("before");
+ }
+
+ if (parts[2].equals("send"))
+ {
+ afterSend = parts[1].equals("after");
+ beforeSend = parts[1].equals("before");
+ }
+
+ if (parts[1].equals("once"))
+ {
+ failOnce = true;
+ }
+ }
+ else
+ {
+ System.out.println("Unrecognized failover request:" + arg);
+ }
+ }
+ }
+
+ // Create a ping producer to handle the request/wait/reply cycle.
+ PingPongProducer pingProducer =
+ new PingPongProducer(brokerDetails, DEFAULT_USERNAME, DEFAULT_PASSWORD, virtualpath, destName, selector,
+ transacted, persistent, messageSize, verbose, afterCommit, beforeCommit, afterSend,
+ beforeSend, failOnce, batchSize, destCount, rate, pubsub);
+
+ pingProducer.getConnection().start();
+
+ // Create a shutdown hook to terminate the ping-pong producer.
+ Runtime.getRuntime().addShutdownHook(pingProducer.getShutdownHook());
+
+ // Ensure that the ping pong producer is registered to listen for exceptions on the connection too.
+ pingProducer.getConnection().setExceptionListener(pingProducer);
+
+ // Create the ping loop thread and run it until it is terminated by the shutdown hook or exception.
+ Thread pingThread = new Thread(pingProducer);
+ pingThread.run();
+ pingThread.join();
+ }
+
+ /**
+ * Convenience method for a short pause.
+ *
+ * @param sleepTime The time in milliseconds to pause for.
+ */
+ public static void pause(long sleepTime)
+ {
+ if (sleepTime > 0)
+ {
+ try
+ {
+ Thread.sleep(sleepTime);
+ }
+ catch (InterruptedException ie)
+ { }
+ }
+ }
+
+ /**
+ * Gets all the reply destinations (to listen for replies on). In this case this will just be the single reply
+ * to destination of this pinger.
+ *
+ * @return The single reply to destination of this pinger, wrapped in a list.
+ */
+ public List<Destination> getReplyDestinations()
+ {
+ _logger.debug("public List<Destination> getReplyDestinations(): called");
+
+ List<Destination> replyDestinations = new ArrayList<Destination>();
+ replyDestinations.add(_replyDestination);
+
+ return replyDestinations;
+ }
+
+ /**
+ * Creates the producer to send the pings on. This is created without a default destination. Its persistent delivery
+ * flag is set accoring the ping producer creation options.
+ *
+ * @throws JMSException Any JMSExceptions are allowed to fall through.
+ */
+ public void createProducer() throws JMSException
+ {
+ _logger.debug("public void createProducer(): called");
+
+ _producer = (MessageProducer) _producerSession.createProducer(null);
+ //_producer.setDisableMessageTimestamp(true);
+ _producer.setDeliveryMode(_persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+ }
+
+ /**
+ * Creates consumers for the specified number of destinations. The destinations themselves are also created by
+ * this method.
+ *
+ * @param noOfDestinations The number of destinations to create consumers for.
+ * @param selector The message selector to filter the consumers with.
+ * @param rootName The root of the name, or actual name if only one is being created.
+ * @param unique <tt>true</tt> to make the destinations unique to this pinger, <tt>false</tt> to share
+ * the numbering with all pingers on the same JVM.
+ *
+ * @throws JMSException Any JMSExceptions are allowed to fall through.
+ */
+ public void createPingDestinations(int noOfDestinations, String selector, String rootName, boolean unique)
+ throws JMSException
+ {
+ _logger.debug("public void createPingDestinations(int noOfDestinations = " + noOfDestinations
+ + ", String selector = " + selector + ", String rootName = " + rootName + ", boolean unique = "
+ + unique + "): called");
+
+ // Create the desired number of ping destinations and consumers for them.
+ for (int i = 0; i < noOfDestinations; i++)
+ {
+ AMQDestination destination = null;
+
+ int id;
+
+ // Generate an id, unique within this pinger or to the whole JVM depending on the unique flag.
+ if (unique)
+ {
+ id = _queueJVMSequenceID.incrementAndGet();
+ }
+ else
+ {
+ id = _queueSharedId.incrementAndGet();
+ }
+
+ // Check if this is a pub/sub pinger, in which case create topics.
+ if (_isPubSub)
+ {
+ AMQShortString name = new AMQShortString(rootName + id);
+ destination = new AMQTopic(name);
+ }
+ // Otherwise this is a p2p pinger, in which case create queues.
+ else
+ {
+ AMQShortString name = new AMQShortString(rootName + id);
+ destination = new AMQQueue(name, name, false, false, false);
+ }
+
+ // Keep the destination.
+ _pingDestinations.add(destination);
+ }
+ }
+
+ /**
+ * Stores the received message in the replies map, then resets the boolean latch that a thread waiting for a
+ * correlating reply may be waiting on. This is only done if the reply has a correlation id that is expected
+ * in the replies map.
+ *
+ * @param message The received message.
+ */
+ public void onMessage(Message message)
+ {
+ _logger.debug("public void onMessage(Message message): called");
+
+ try
+ {
+ // Extract the messages correlation id.
+ String correlationID = message.getJMSCorrelationID();
+ _logger.debug("correlationID = " + correlationID);
+
+ // Countdown on the traffic light if there is one for the matching correlation id.
+ CountDownLatch trafficLight = trafficLights.get(correlationID);
+
+ if (trafficLight != null)
+ {
+ _logger.debug("Reply was expected, decrementing the latch for the id, " + correlationID);
+
+ // Decrement the countdown latch. Before this point, it is possible that two threads might enter this
+ // method simultanesouly with the same correlation id. Decrementing the latch in a synchronized block
+ // ensures that each thread will get a unique value for the remaining messages.
+ long trueCount = -1;
+ long remainingCount = -1;
+
+ synchronized (trafficLight)
+ {
+ trafficLight.countDown();
+
+ trueCount = trafficLight.getCount();
+ remainingCount = trueCount - 1;
+
+ _logger.debug("remainingCount = " + remainingCount);
+ _logger.debug("trueCount = " + trueCount);
+
+ // Commit on transaction batch size boundaries. At this point in time the waiting producer remains
+ // blocked, even on the last message.
+ if ((remainingCount % _txBatchSize) == 0)
+ {
+ commitTx(_consumerSession);
+ }
+
+ // Forward the message and remaining count to any interested chained message listener.
+ if (_chainedMessageListener != null)
+ {
+ _chainedMessageListener.onMessage(message, (int) remainingCount);
+ }
+
+ // Check if this is the last message, in which case release any waiting producers. This is done
+ // after the transaction has been committed and any listeners notified.
+ if (trueCount == 1)
+ {
+ trafficLight.countDown();
+ }
+ }
+ }
+ else
+ {
+ _logger.debug("There was no thread waiting for reply: " + correlationID);
+ }
+
+ // Print out ping times for every message in verbose mode only.
+ if (_verbose)
+ {
+ Long timestamp = message.getLongProperty(MESSAGE_TIMESTAMP_PROPNAME);
+
+ if (timestamp != null)
+ {
+ long diff = System.nanoTime() - timestamp;
+ _logger.trace("Time for round trip (nanos): " + diff);
+ }
+ }
+ }
+ catch (JMSException e)
+ {
+ _logger.warn("There was a JMSException: " + e.getMessage(), e);
+ }
+
+ _logger.debug("public void onMessage(Message message): ending");
+ }
+
+ /**
+ * Sends the specified number of ping message and then waits for all correlating replies. If the wait times out
+ * before a reply arrives, then a null reply is returned from this method. This method generates a new unqiue
+ * correlation id for the messages.
+ *
+ * @param message The message to send.
+ * @param numPings The number of ping messages to send.
+ * @param timeout The timeout in milliseconds.
+ *
+ * @return The number of replies received. This may be less than the number sent if the timeout terminated the
+ * wait for all prematurely.
+ *
+ * @throws JMSException All underlying JMSExceptions are allowed to fall through.
+ */
+ public int pingAndWaitForReply(Message message, int numPings, long timeout) throws JMSException, InterruptedException
+ {
+ _logger.debug("public int pingAndWaitForReply(Message message, int numPings = " + numPings + ", long timeout = "
+ + timeout + "): called");
+
+ // Create a unique correlation id to put on the messages before sending them.
+ String messageCorrelationId = Long.toString(idGenerator.incrementAndGet());
+
+ return pingAndWaitForReply(message, numPings, timeout, messageCorrelationId);
+ }
+
+ /**
+ * Sends the specified number of ping message and then waits for all correlating replies. If the wait times out
+ * before a reply arrives, then a null reply is returned from this method. This method allows the caller to specify
+ * the correlation id.
+ *
+ * @param message The message to send.
+ * @param numPings The number of ping messages to send.
+ * @param timeout The timeout in milliseconds.
+ * @param messageCorrelationId The message correlation id.
+ *
+ * @return The number of replies received. This may be less than the number sent if the timeout terminated the
+ * wait for all prematurely.
+ *
+ * @throws JMSException All underlying JMSExceptions are allowed to fall through.
+ */
+ public int pingAndWaitForReply(Message message, int numPings, long timeout, String messageCorrelationId)
+ throws JMSException, InterruptedException
+ {
+ _logger.debug("public int pingAndWaitForReply(Message message, int numPings = " + numPings + ", long timeout = "
+ + timeout + ", String messageCorrelationId = " + messageCorrelationId + "): called");
+
+ try
+ {
+ // Create a count down latch to count the number of replies with. This is created before the messages are
+ // sent so that the replies cannot be received before the count down is created.
+ // One is added to this, so that the last reply becomes a special case. The special case is that the
+ // chained message listener must be called before this sender can be unblocked, but that decrementing the
+ // countdown needs to be done before the chained listener can be called.
+ CountDownLatch trafficLight = new CountDownLatch(numPings + 1);
+ trafficLights.put(messageCorrelationId, trafficLight);
+
+ // Send the specifed number of messages.
+ pingNoWaitForReply(message, numPings, messageCorrelationId);
+
+ // Block the current thread until replies to all the message are received, or it times out.
+ trafficLight.await(timeout, TimeUnit.MILLISECONDS);
+
+ // Work out how many replies were receieved.
+ int numReplies = numPings - (int) trafficLight.getCount();
+
+ if ((numReplies < numPings) && _verbose)
+ {
+ _logger.info("Timed out (" + timeout + " ms) before all replies received on id, " + messageCorrelationId);
+ }
+ else if (_verbose)
+ {
+ _logger.info("Got all replies on id, " + messageCorrelationId);
+ }
+
+ commitTx(_consumerSession);
+
+ _logger.debug("public int pingAndWaitForReply(Message message, int numPings, long timeout): ending");
+
+ return numReplies;
+ }
+ // Ensure that the message countdown latch is always removed from the reply map. The reply map is long lived,
+ // so will be a memory leak if this is not done.
+ finally
+ {
+ trafficLights.remove(messageCorrelationId);
+ }
+ }
+
+ /**
+ * Sends the specified number of ping messages and does not wait for correlating replies.
+ *
+ * @param message The message to send.
+ * @param numPings The number of pings to send.
+ * @param messageCorrelationId A correlation id to place on all messages sent.
+ *
+ * @throws JMSException All underlying JMSExceptions are allowed to fall through.
+ */
+ public void pingNoWaitForReply(Message message, int numPings, String messageCorrelationId) throws JMSException
+ {
+ _logger.debug("public void pingNoWaitForReply(Message message, int numPings = " + numPings
+ + ", String messageCorrelationId = " + messageCorrelationId + "): called");
+
+ message.setJMSCorrelationID(messageCorrelationId);
+
+ // Set up a committed flag to detect uncommitted messages at the end of the send loop. This may occurr if the
+ // transaction batch size is not a factor of the number of pings. In which case an extra commit at the end is
+ // needed.
+ boolean committed = false;
+
+ // Send all of the ping messages.
+ for (int i = 0; i < numPings; i++)
+ {
+ // Reset the committed flag to indicate that there are uncommitted messages.
+ committed = false;
+
+ // Re-timestamp the message.
+ message.setLongProperty(MESSAGE_TIMESTAMP_PROPNAME, System.nanoTime());
+
+ // Round robin the destinations as the messages are sent.
+ //return _destinationCount;
+ sendMessage(_pingDestinations.get(i % _pingDestinations.size()), message);
+
+ // Apply message rate throttling if a rate limit has been set up.
+ if (_rateLimiter != null)
+ {
+ _rateLimiter.throttle();
+ }
+
+ // Call commit every time the commit batch size is reached.
+ if ((i % _txBatchSize) == 0)
+ {
+ commitTx(_producerSession);
+ committed = true;
+ }
+
+ // Spew out per message timings on every message sonly in verbose mode.
+ if (_verbose)
+ {
+ _logger.info(timestampFormatter.format(new Date()) + ": Pinged at with correlation id, "
+ + messageCorrelationId);
+ }
+ }
+
+ // Call commit if the send loop finished before reaching a batch size boundary so there may still be uncommitted messages.
+ if (!committed)
+ {
+ commitTx(_producerSession);
+ }
+ }
+
+ /**
+ * The ping loop implementation. This sends out pings waits for replies and inserts short pauses in between each.
+ */
+ public void pingLoop()
+ {
+ try
+ {
+ // Generate a sample message and time stamp it.
+ ObjectMessage msg = getTestMessage(_replyDestination, _messageSize, _persistent);
+ msg.setLongProperty(MESSAGE_TIMESTAMP_PROPNAME, System.nanoTime());
+
+ // Send the message and wait for a reply.
+ pingAndWaitForReply(msg, DEFAULT_TX_BATCH_SIZE, DEFAULT_TIMEOUT);
+
+ // Introduce a short pause if desired.
+ pause(DEFAULT_SLEEP_TIME);
+ }
+ catch (JMSException e)
+ {
+ _publish = false;
+ _logger.debug("There was a JMSException: " + e.getMessage(), e);
+ }
+ catch (InterruptedException e)
+ {
+ _publish = false;
+ _logger.debug("There was an interruption: " + e.getMessage(), e);
+ }
+ }
+
+ /*public Destination getReplyDestination()
+ {
+ return _replyDestination;
+ }*/
+
+ /**
+ * Sets a chained message listener. The message listener on this pinger, chains all its messages to the one set
+ * here.
+ *
+ * @param messageListener The chained message listener.
+ */
+ public void setChainedMessageListener(ChainedMessageListener messageListener)
+ {
+ _chainedMessageListener = messageListener;
+ }
+
+ /**
+ * Removes any chained message listeners from this pinger.
+ */
+ public void removeChainedMessageListener()
+ {
+ _chainedMessageListener = null;
+ }
+
+ /**
+ * Generates a test message of the specified size, with the specified reply-to destination and persistence flag.
+ *
+ * @param replyQueue The reply-to destination for the message.
+ * @param messageSize The desired size of the message in bytes.
+ * @param persistent <tt>true</tt> if the message should use persistent delivery, <tt>false</tt> otherwise.
+ *
+ * @return A freshly generated test message.
+ *
+ * @throws javax.jms.JMSException All underlying JMSException are allowed to fall through.
+ */
+ public ObjectMessage getTestMessage(Destination replyQueue, int messageSize, boolean persistent) throws JMSException
+ {
+ ObjectMessage msg = TestMessageFactory.newObjectMessage(_producerSession, replyQueue, messageSize, persistent);
+
+ // Timestamp the message in nanoseconds.
+ msg.setLongProperty(MESSAGE_TIMESTAMP_PROPNAME, System.nanoTime());
+
+ return msg;
+ }
+
+ /**
+ * Stops the ping loop by clearing the publish flag. The current loop will complete before it notices that this
+ * flag has been cleared.
+ */
+ public void stop()
+ {
+ _publish = false;
+ }
+
+ /**
+ * Implements a ping loop that repeatedly pings until the publish flag becomes false.
+ */
+ public void run()
+ {
+ // Keep running until the publish flag is cleared.
+ while (_publish)
+ {
+ pingLoop();
+ }
+ }
+
+ /**
+ * Callback method, implementing ExceptionListener. This should be registered to listen for exceptions on the
+ * connection, this clears the publish flag which in turn will halt the ping loop.
+ *
+ * @param e The exception that triggered this callback method.
+ */
+ public void onException(JMSException e)
+ {
+ _publish = false;
+ _logger.debug("There was a JMSException: " + e.getMessage(), e);
+ }
+
+ /**
+ * Gets a shutdown hook that will cleanly shut this down when it is running the ping loop. This can be registered
+ * with the runtime system as a shutdown hook.
+ *
+ * @return A shutdown hook for the ping loop.
+ */
+ public Thread getShutdownHook()
+ {
+ return new Thread(new Runnable()
+ {
+ public void run()
+ {
+ stop();
+ }
+ });
+ }
+
+ /**
+ * Gets the underlying connection that this ping client is running on.
+ *
+ * @return The underlying connection that this ping client is running on.
+ */
+ public Connection getConnection()
+ {
+ return _connection;
+ }
+
+ /**
+ * Creates consumers for the specified destinations and registers this pinger to listen to their messages.
+ *
+ * @param destinations The destinations to listen to.
+ * @param selector A selector to filter the messages with.
+ *
+ * @throws javax.jms.JMSException Any JMSExceptions are allowed to fall through.
+ */
+ public void createReplyConsumers(Collection<Destination> destinations, String selector) throws JMSException
+ {
+ _logger.debug("public void createReplyConsumers(Collection<Destination> destinations = " + destinations
+ + ", String selector = " + selector + "): called");
+
+ for (Destination destination : destinations)
+ {
+ // Create a consumer for the destination and set this pinger to listen to its messages.
+ MessageConsumer consumer =
+ _consumerSession.createConsumer(destination, DEFAULT_PREFETCH, DEFAULT_NO_LOCAL, DEFAULT_EXCLUSIVE,
+ selector);
+ consumer.setMessageListener(this);
+ }
+ }
+
+ /**
+ * Closes the pingers connection.
+ *
+ * @throws JMSException All JMSException are allowed to fall through.
+ */
+ public void close() throws JMSException
+ {
+ _logger.debug("public void close(): called");
+
+ if (_connection != null)
+ {
+ _connection.close();
+ }
+ }
+
+ /**
+ * Convenience method to commit the transaction on the specified session. If the session to commit on is not
+ * a transactional session, this method does nothing (unless the failover after send flag is set).
+ *
+ * <p/>If the {@link #_failAfterSend} flag is set, this will prompt the user to kill the broker before the commit
+ * is applied. This flag applies whether the pinger is transactional or not.
+ *
+ * <p/>If the {@link #_failBeforeCommit} flag is set, this will prompt the user to kill the broker before the
+ * commit is applied. If the {@link #_failAfterCommit} flag is set, this will prompt the user to kill the broker
+ * after the commit is applied. These flags will only apply if using a transactional pinger.
+ *
+ * @throws javax.jms.JMSException If the commit fails and then the rollback fails.
+ *
+ * @todo Consider moving the fail after send logic into the send method. It is confusing to have it in this commit
+ * method, because commits only apply to transactional pingers, but fail after send applied to transactional
+ * and non-transactional alike.
+ */
+ protected void commitTx(Session session) throws JMSException
+ {
+ _logger.debug("protected void commitTx(Session session): called");
+
+ _logger.trace("Batch time reached");
+ if (_failAfterSend)
+ {
+ _logger.trace("Batch size reached");
+ if (_failOnce)
+ {
+ _failAfterSend = false;
+ }
+
+ _logger.trace("Failing After Send");
+ doFailover();
+ }
+
+ if (session.getTransacted())
+ {
+ try
+ {
+ if (_failBeforeCommit)
+ {
+ if (_failOnce)
+ {
+ _failBeforeCommit = false;
+ }
+
+ _logger.trace("Failing Before Commit");
+ doFailover();
+ }
+
+ session.commit();
+
+ if (_failAfterCommit)
+ {
+ if (_failOnce)
+ {
+ _failAfterCommit = false;
+ }
+
+ _logger.trace("Failing After Commit");
+ doFailover();
+ }
+
+ _logger.trace("Session Commited.");
+ }
+ catch (JMSException e)
+ {
+ _logger.trace("JMSException on commit:" + e.getMessage(), e);
+
+ // Warn that the bounce back client is not available.
+ if (e.getLinkedException() instanceof AMQNoConsumersException)
+ {
+ _logger.debug("No consumers on queue.");
+ }
+
+ try
+ {
+ session.rollback();
+ _logger.trace("Message rolled back.");
+ }
+ catch (JMSException jmse)
+ {
+ _logger.trace("JMSE on rollback:" + jmse.getMessage(), jmse);
+
+ // Both commit and rollback failed. Throw the rollback exception.
+ throw jmse;
+ }
+ }
+ }
+ }
+
+ /**
+ * Sends the message to the specified destination. If the destination is null, it gets sent to the default destination
+ * of the ping producer. If an explicit destination is set, this overrides the default.
+ *
+ * @param destination The destination to send to.
+ * @param message The message to send.
+ *
+ * @throws javax.jms.JMSException All underlying JMSExceptions are allowed to fall through.
+ */
+ protected void sendMessage(Destination destination, Message message) throws JMSException
+ {
+ if (_failBeforeSend)
+ {
+ if (_failOnce)
+ {
+ _failBeforeSend = false;
+ }
+
+ _logger.trace("Failing Before Send");
+ doFailover();
+ }
+
+ if (destination == null)
+ {
+ _producer.send(message);
+ }
+ else
+ {
+ _producer.send(destination, message);
+ }
+ }
+
+ /**
+ * Prompts the user to terminate the broker, in order to test failover functionality. This method will block
+ * until the user supplied some input on the terminal.
+ */
+ protected void doFailover()
+ {
+ System.out.println("Kill Broker now then press return");
+ try
+ {
+ System.in.read();
+ }
+ catch (IOException e)
+ { }
+
+ System.out.println("Continuing.");
+ }
+
+ /**
+ * Defines a chained message listener interface that can be attached to this pinger. Whenever this pinger's
+ * {@link PingPongProducer#onMessage} method is called, the chained listener set through the
+ * {@link PingPongProducer#setChainedMessageListener} method is passed the message, and the remaining expected
+ * count of messages with that correlation id.
+ *
+ * Provided only one pinger is producing messages with that correlation id, the chained listener will always be
+ * given unique message counts. It will always be called while the producer waiting for all messages to arrive is
+ * still blocked.
+ */
+ public static interface ChainedMessageListener
+ {
+ public void onMessage(Message message, int remainingCount) throws JMSException;
+ }
+}
diff --git a/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceProvidingClient.java b/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceProvidingClient.java deleted file mode 100644 index ddee643a76..0000000000 --- a/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceProvidingClient.java +++ /dev/null @@ -1,201 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.requestreply; - -import org.apache.log4j.Logger; -import org.apache.qpid.client.AMQConnection; -import org.apache.qpid.client.AMQQueue; -import org.apache.qpid.jms.Session; -import org.apache.qpid.jms.ConnectionListener; -import org.apache.qpid.AMQException; -import org.apache.qpid.url.URLSyntaxException; - -import javax.jms.*; -import java.net.InetAddress; -import java.net.UnknownHostException; - -public class ServiceProvidingClient -{ - private static final Logger _logger = Logger.getLogger(ServiceProvidingClient.class); - - private MessageProducer _destinationProducer; - - private Destination _responseDest; - - private AMQConnection _connection; - - public ServiceProvidingClient(String brokerDetails, String username, String password, - String clientName, String virtualPath, String serviceName) - throws AMQException, JMSException, URLSyntaxException - { - _connection = new AMQConnection(brokerDetails, username, password, - clientName, virtualPath); - _connection.setConnectionListener(new ConnectionListener() - { - - public void bytesSent(long count) - { - } - - public void bytesReceived(long count) - { - } - - public boolean preFailover(boolean redirect) - { - return true; - } - - public boolean preResubscribe() - { - return true; - } - - public void failoverComplete() - { - _logger.info("App got failover complete callback"); - } - }); - final Session session = (Session) _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - - _logger.info("Service (queue) name is '" + serviceName + "'..."); - - AMQQueue destination = new AMQQueue(serviceName); - - MessageConsumer consumer = session.createConsumer(destination, - 100, true, false, null); - - consumer.setMessageListener(new MessageListener() - { - private int _messageCount; - - public void onMessage(Message message) - { - //_logger.info("Got message '" + message + "'"); - - TextMessage tm = (TextMessage) message; - - try - { - Destination responseDest = tm.getJMSReplyTo(); - if (responseDest == null) - { - _logger.info("Producer not created because the response destination is null."); - return; - } - - if (!responseDest.equals(_responseDest)) - { - _responseDest = responseDest; - - _logger.info("About to create a producer"); - _destinationProducer = session.createProducer(responseDest); - _destinationProducer.setDisableMessageTimestamp(true); - _destinationProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); - _logger.info("After create a producer"); - } - } - catch (JMSException e) - { - _logger.error("Error creating destination"); - } - _messageCount++; - if (_messageCount % 1000 == 0) - { - _logger.info("Received message total: " + _messageCount); - _logger.info("Sending response to '" + _responseDest + "'"); - } - - try - { - String payload = "This is a response: sing together: 'Mahnah mahnah...'" + tm.getText(); - TextMessage msg = session.createTextMessage(payload); - if (tm.propertyExists("timeSent")) - { - _logger.info("timeSent property set on message"); - _logger.info("timeSent value is: " + tm.getLongProperty("timeSent")); - msg.setStringProperty("timeSent", tm.getStringProperty("timeSent")); - } - _destinationProducer.send(msg); - if (_messageCount % 1000 == 0) - { - _logger.info("Sent response to '" + _responseDest + "'"); - } - } - catch (JMSException e) - { - _logger.error("Error sending message: " + e, e); - } - } - }); - } - - public void run() throws JMSException - { - _connection.start(); - _logger.info("Waiting..."); - } - - public static void main(String[] args) - { - _logger.info("Starting..."); - - if (args.length < 5) - { - System.out.println("Usage: brokerDetails username password virtual-path serviceQueue [selector]"); - System.exit(1); - } - String clientId = null; - try - { - InetAddress address = InetAddress.getLocalHost(); - clientId = address.getHostName() + System.currentTimeMillis(); - } - catch (UnknownHostException e) - { - _logger.error("Error: " + e, e); - } - - try - { - ServiceProvidingClient client = new ServiceProvidingClient(args[0], args[1], args[2], - clientId, args[3], args[4]); - client.run(); - } - catch (JMSException e) - { - _logger.error("Error: " + e, e); - } - catch (AMQException e) - { - _logger.error("Error: " + e, e); - } - catch (URLSyntaxException e) - { - _logger.error("Error: " + e, e); - } - - - - } - -} - diff --git a/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceRequestingClient.java b/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceRequestingClient.java deleted file mode 100644 index b52d06558a..0000000000 --- a/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceRequestingClient.java +++ /dev/null @@ -1,303 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.requestreply; - -import org.apache.log4j.Logger; -import org.apache.qpid.AMQException; -import org.apache.qpid.url.URLSyntaxException; -import org.apache.qpid.client.AMQConnection; -import org.apache.qpid.client.AMQQueue; -import org.apache.qpid.client.AMQDestination; -import org.apache.qpid.jms.MessageConsumer; -import org.apache.qpid.jms.MessageProducer; -import org.apache.qpid.jms.Session; - -import javax.jms.*; -import java.net.InetAddress; -import java.net.UnknownHostException; - -/** - * A client that behaves as follows: - * <ul><li>Connects to a queue, whose name is specified as a cmd-line argument</li> - * <li>Creates a temporary queue</li> - * <li>Creates messages containing a property that is the name of the temporary queue</li> - * <li>Fires off a message on the original queue and waits for a response on the temporary queue</li> - * </ul> - * - */ -public class ServiceRequestingClient implements ExceptionListener -{ - private static final Logger _log = Logger.getLogger(ServiceRequestingClient.class); - - private static final String MESSAGE_DATA_BYTES = "jfd ghljgl hjvhlj cvhvjf ldhfsj lhfdsjf hldsjfk hdslkfj hsdflk "; - - private String MESSAGE_DATA; - - private AMQConnection _connection; - - private Session _session; - - private long _averageLatency; - - private int _messageCount; - - private volatile boolean _completed; - - private AMQDestination _tempDestination; - - private MessageProducer _producer; - - private Object _waiter; - - private static String createMessagePayload(int size) - { - _log.info("Message size set to " + size + " bytes"); - StringBuffer buf = new StringBuffer(size); - int count = 0; - while (count < size + MESSAGE_DATA_BYTES.length()) - { - buf.append(MESSAGE_DATA_BYTES); - count += MESSAGE_DATA_BYTES.length(); - } - if (count < size) - { - buf.append(MESSAGE_DATA_BYTES, 0, size - count); - } - - return buf.toString(); - } - - private class CallbackHandler implements MessageListener - { - private int _expectedMessageCount; - - private int _actualMessageCount; - - private long _startTime; - - public CallbackHandler(int expectedMessageCount, long startTime) - { - _expectedMessageCount = expectedMessageCount; - _startTime = startTime; - } - - public void onMessage(Message m) - { - if (_log.isDebugEnabled()) - { - _log.debug("Message received: " + m); - } - try - { - m.getPropertyNames(); - if (m.propertyExists("timeSent")) - { - long timeSent = Long.parseLong(m.getStringProperty("timeSent")); - long now = System.currentTimeMillis(); - if (_averageLatency == 0) - { - _averageLatency = now - timeSent; - _log.info("Latency " + _averageLatency); - } - else - { - _log.info("Individual latency: " + (now - timeSent)); - _averageLatency = (_averageLatency + (now - timeSent)) / 2; - _log.info("Average latency now: " + _averageLatency); - } - } - } - catch (JMSException e) - { - _log.error("Error getting latency data: " + e, e); - } - _actualMessageCount++; - if (_actualMessageCount % 1000 == 0) - { - _log.info("Received message count: " + _actualMessageCount); - } - - if (_actualMessageCount == _expectedMessageCount) - { - _completed = true; - notifyWaiter(); - long timeTaken = System.currentTimeMillis() - _startTime; - _log.info("Total time taken to receive " + _expectedMessageCount + " messages was " + - timeTaken + "ms, equivalent to " + - (_expectedMessageCount / (timeTaken / 1000.0)) + " messages per second"); - - try - { - _connection.close(); - _log.info("Connection closed"); - } - catch (JMSException e) - { - _log.error("Error closing connection"); - } - } - } - } - - private void notifyWaiter() - { - if (_waiter != null) - { - synchronized (_waiter) - { - _waiter.notify(); - } - } - } - public ServiceRequestingClient(String brokerHosts, String clientID, String username, String password, - String vpath, String commandQueueName, - final int messageCount, final int messageDataLength) throws AMQException, URLSyntaxException - { - _messageCount = messageCount; - MESSAGE_DATA = createMessagePayload(messageDataLength); - try - { - createConnection(brokerHosts, clientID, username, password, vpath); - _session = (Session) _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - - - _connection.setExceptionListener(this); - - - AMQQueue destination = new AMQQueue(commandQueueName); - _producer = (MessageProducer) _session.createProducer(destination); - _producer.setDisableMessageTimestamp(true); - _producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); - - _tempDestination = new AMQQueue("TempResponse" + - Long.toString(System.currentTimeMillis()), true); - MessageConsumer messageConsumer = (MessageConsumer) _session.createConsumer(_tempDestination, 100, true, - true, null); - - //Send first message, then wait a bit to allow the provider to get initialised - TextMessage first = _session.createTextMessage(MESSAGE_DATA); - first.setJMSReplyTo(_tempDestination); - _producer.send(first); - try - { - Thread.sleep(1000); - } - catch (InterruptedException ignore) - { - } - - //now start the clock and the test... - final long startTime = System.currentTimeMillis(); - - messageConsumer.setMessageListener(new CallbackHandler(messageCount, startTime)); - } - catch (JMSException e) - { - e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. - } - } - - /** - * Run the test and notify an object upon receipt of all responses. - * @param waiter the object that will be notified - * @throws JMSException - */ - public void run(Object waiter) throws JMSException - { - _waiter = waiter; - _connection.start(); - for (int i = 1; i < _messageCount; i++) - { - TextMessage msg = _session.createTextMessage(MESSAGE_DATA + i); - msg.setJMSReplyTo(_tempDestination); - if (i % 1000 == 0) - { - long timeNow = System.currentTimeMillis(); - msg.setStringProperty("timeSent", String.valueOf(timeNow)); - } - _producer.send(msg); - } - _log.info("Finished sending " + _messageCount + " messages"); - } - - public boolean isCompleted() - { - return _completed; - } - - private void createConnection(String brokerHosts, String clientID, String username, String password, - String vpath) throws AMQException, URLSyntaxException - { - _connection = new AMQConnection(brokerHosts, username, password, - clientID, vpath); - } - - /** - * @param args argument 1 if present specifies the name of the temporary queue to create. Leaving it blank - * means the server will allocate a name. - */ - public static void main(String[] args) - { - if (args.length < 6) - { - System.err.println( - "Usage: ServiceRequestingClient <brokerDetails - semicolon separated host:port list> <username> <password> <vpath> <command queue name> <number of messages> <message size>"); - } - try - { - int messageDataLength = args.length > 6 ? Integer.parseInt(args[6]) : 4096; - - InetAddress address = InetAddress.getLocalHost(); - String clientID = address.getHostName() + System.currentTimeMillis(); - ServiceRequestingClient client = new ServiceRequestingClient(args[0], clientID, args[1], args[2], args[3], - args[4], Integer.parseInt(args[5]), - messageDataLength); - Object waiter = new Object(); - client.run(waiter); - synchronized (waiter) - { - while (!client.isCompleted()) - { - waiter.wait(); - } - } - - } - catch (UnknownHostException e) - { - e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. - } - catch (Exception e) - { - System.err.println("Error in client: " + e); - e.printStackTrace(); - } - } - - /** - * @see javax.jms.ExceptionListener#onException(javax.jms.JMSException) - */ - public void onException(JMSException e) - { - System.err.println(e.getMessage()); - e.printStackTrace(System.err); - } -} diff --git a/java/perftests/src/main/java/org/apache/qpid/topic/Config.java b/java/perftests/src/main/java/org/apache/qpid/topic/Config.java index bb740f9094..60aa9f3930 100644 --- a/java/perftests/src/main/java/org/apache/qpid/topic/Config.java +++ b/java/perftests/src/main/java/org/apache/qpid/topic/Config.java @@ -22,14 +22,12 @@ package org.apache.qpid.topic; import org.apache.qpid.client.AMQSession; import org.apache.qpid.config.ConnectorConfig; -import org.apache.qpid.config.ConnectionFactoryInitialiser; import org.apache.qpid.config.Connector; import org.apache.qpid.config.AbstractConfig; import javax.jms.Connection; -import javax.jms.ConnectionFactory; -class Config extends AbstractConfig implements ConnectorConfig +public class Config extends AbstractConfig implements ConnectorConfig { private String host = "localhost"; @@ -45,23 +43,30 @@ class Config extends AbstractConfig implements ConnectorConfig private int ackMode= AMQSession.NO_ACKNOWLEDGE; private String clientId; private String subscriptionId; + private String selector; + private String destinationName; private boolean persistent; + private boolean transacted; + private int destinationsCount; + private int batchSize; + private int rate; + private boolean ispubsub; public Config() { } - int getAckMode() + public int getAckMode() { return ackMode; } - void setPayload(int payload) + public void setPayload(int payload) { this.payload = payload; } - int getPayload() + public int getPayload() { return payload; } @@ -81,11 +86,26 @@ class Config extends AbstractConfig implements ConnectorConfig this.messages = messages; } - int getMessages() + public int getMessages() { return messages; } + public int getBatchSize() + { + return batchSize; + } + + public int getRate() + { + return rate; + } + + public int getDestinationsCount() + { + return destinationsCount; + } + public String getHost() { return host; @@ -141,21 +161,41 @@ class Config extends AbstractConfig implements ConnectorConfig this.delay = delay; } - String getClientId() + public String getClientId() { return clientId; } - String getSubscriptionId() + public String getSubscriptionId() { return subscriptionId; } - boolean usePersistentMessages() + public String getSelector() + { + return selector; + } + + public String getDestination() + { + return destinationName; + } + + public boolean usePersistentMessages() { return persistent; } + public boolean isTransacted() + { + return transacted; + } + + public boolean isPubSub() + { + return ispubsub; + } + public void setOption(String key, String value) { if("-host".equalsIgnoreCase(key)) @@ -217,6 +257,34 @@ class Config extends AbstractConfig implements ConnectorConfig { persistent = "true".equalsIgnoreCase(value); } + else if("-transacted".equalsIgnoreCase(key)) + { + transacted = "true".equalsIgnoreCase(value); + } + else if ("-destinationscount".equalsIgnoreCase(key)) + { + destinationsCount = parseInt("Bad destinations count", value); + } + else if ("-batchsize".equalsIgnoreCase(key)) + { + batchSize = parseInt("Bad batch size", value); + } + else if ("-rate".equalsIgnoreCase(key)) + { + rate = parseInt("MEssage rate", value); + } + else if("-pubsub".equalsIgnoreCase(key)) + { + ispubsub = "true".equalsIgnoreCase(value); + } + else if("-selector".equalsIgnoreCase(key)) + { + selector = value; + } + else if("-destinationname".equalsIgnoreCase(key)) + { + destinationName = value; + } else { System.out.println("Ignoring unrecognised option: " + key); diff --git a/java/perftests/src/main/java/org/apache/qpid/topic/Publisher.java b/java/perftests/src/main/java/org/apache/qpid/topic/Publisher.java index d788029ee9..c3b19b558a 100644 --- a/java/perftests/src/main/java/org/apache/qpid/topic/Publisher.java +++ b/java/perftests/src/main/java/org/apache/qpid/topic/Publisher.java @@ -51,7 +51,7 @@ public class Publisher implements MessageListener _factory.createControlConsumer().setMessageListener(this); _connection.start(); - if(warmup > 0) + if (warmup > 0) { System.out.println("Runing warmup (" + warmup + " msgs)"); long time = batch(warmup, consumerCount); @@ -59,11 +59,14 @@ public class Publisher implements MessageListener } long[] times = new long[batches]; - for(int i = 0; i < batches; i++) + for (int i = 0; i < batches; i++) { - if(i > 0) Thread.sleep(delay*1000); + if (i > 0) + { + Thread.sleep(delay * 1000); + } times[i] = batch(msgCount, consumerCount); - System.out.println("Batch " + (i+1) + " of " + batches + " completed in " + times[i] + " ms."); + System.out.println("Batch " + (i + 1) + " of " + batches + " completed in " + times[i] + " ms."); } long min = min(times); @@ -131,7 +134,7 @@ public class Publisher implements MessageListener static long min(long[] times) { long min = times.length > 0 ? times[0] : 0; - for(int i = 0; i < times.length; i++) + for (int i = 0; i < times.length; i++) { min = Math.min(min, times[i]); } @@ -141,7 +144,7 @@ public class Publisher implements MessageListener static long max(long[] times) { long max = times.length > 0 ? times[0] : 0; - for(int i = 0; i < times.length; i++) + for (int i = 0; i < times.length; i++) { max = Math.max(max, times[i]); } @@ -151,14 +154,22 @@ public class Publisher implements MessageListener static long avg(long[] times, long min, long max) { long sum = 0; - for(int i = 0; i < times.length; i++) + for (int i = 0; i < times.length; i++) { sum += times[i]; } - sum -= min; - sum -= max; - return (sum / (times.length - 2)); + int adjustment = 0; + + // Remove min and max if we have run enough batches. + if (times.length > 2) + { + sum -= min; + sum -= max; + adjustment = 2; + } + + return (sum / (times.length - adjustment)); } public static void main(String[] argv) throws Exception diff --git a/java/perftests/src/main/java/perftests.log4j b/java/perftests/src/main/java/perftests.log4j new file mode 100644 index 0000000000..3bd8c201f8 --- /dev/null +++ b/java/perftests/src/main/java/perftests.log4j @@ -0,0 +1,45 @@ +#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+log4j.rootLogger=${root.logging.level}
+
+
+log4j.logger.org.apache.qpid=${amqj.logging.level}, console
+log4j.additivity.org.apache.qpid=false
+
+log4j.logger.org.apache.qpid.requestreply=${amqj.test.logging.level}
+log4j.logger.org.apache.qpid.pingpong=${amqj.test.logging.level}
+log4j.logger.org.apache.qpid.ping=${amqj.test.logging.level}
+log4j.logger.org.apache.qpid.topic=${amqj.test.logging.level}
+
+
+log4j.logger.uk.co.thebadgerset.junit.extensions=${badger.level}, console
+log4j.additivity.uk.co.thebadgerset.junit.extensions=false
+
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.Threshold=all
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+
+log4j.appender.console.layout.ConversionPattern=%t %d %p [%c{4}] %m%n
+#log4j.appender.console.layout.ConversionPattern=%t %p [%c] %m%n
+
+log4j.appender.fileApp=org.apache.log4j.FileAppender
+log4j.appender.fileApp.file=${log.dir}/perftests.volumetest.log
+log4j.appender.fileApp.Threshold=info
+log4j.appender.fileApp.append=false
+log4j.appender.fileApp.layout=org.apache.log4j.PatternLayout
diff --git a/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java b/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java new file mode 100644 index 0000000000..e10e6353b7 --- /dev/null +++ b/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java @@ -0,0 +1,302 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.ping; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; + +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.ObjectMessage; + +import junit.framework.Test; +import junit.framework.TestSuite; + +import org.apache.log4j.Logger; + +import org.apache.qpid.requestreply.PingPongProducer; + +import uk.co.thebadgerset.junit.extensions.TimingController; +import uk.co.thebadgerset.junit.extensions.TimingControllerAware; +import uk.co.thebadgerset.junit.extensions.util.ParsedProperties; + +/** + * PingAsyncTestPerf is a performance test that outputs multiple timings from its test method, using the timing controller + * interface supplied by the test runner from a seperate listener thread. It differs from the {@link PingTestPerf} test + * that it extends because it can output timings as replies are received, rather than waiting until all expected replies + * are received. This is less 'blocky' than the tests in {@link PingTestPerf}, and provides a truer simulation of sending + * and recieving clients working asynchronously. + * + * <p/><table id="crc"><caption>CRC Card</caption> + * <tr><td> Responsibilities <th> Collaborations + * <tr><td> Send many ping messages and output timings asynchronously on batches received. + * </table> + */ +public class PingAsyncTestPerf extends PingTestPerf implements TimingControllerAware +{ + private static Logger _logger = Logger.getLogger(PingAsyncTestPerf.class); + + /** Holds the name of the property to get the test results logging batch size. */ + public static final String TEST_RESULTS_BATCH_SIZE_PROPNAME = "BatchSize"; + + /** Holds the default test results logging batch size. */ + public static final int DEFAULT_TEST_RESULTS_BATCH_SIZE = 1000; + + /** Used to hold the timing controller passed from the test runner. */ + private TimingController _timingController; + + /** Used to generate unique correlation ids for each test run. */ + private AtomicLong corellationIdGenerator = new AtomicLong(); + + /** Holds test specifics by correlation id. This consists of the expected number of messages and the timing controler. */ + private Map<String, PerCorrelationId> perCorrelationIds = + Collections.synchronizedMap(new HashMap<String, PerCorrelationId>()); + + /** Holds the batched results listener, that does logging on batch boundaries. */ + private BatchedResultsListener batchedResultsListener = null; + + /** + * Creates a new asynchronous ping performance test with the specified name. + * + * @param name The test name. + */ + public PingAsyncTestPerf(String name) + { + super(name); + + // Sets up the test parameters with defaults. + ParsedProperties.setSysPropertyIfNull(TEST_RESULTS_BATCH_SIZE_PROPNAME, + Integer.toString(DEFAULT_TEST_RESULTS_BATCH_SIZE)); + } + + /** + * Compile all the tests into a test suite. + */ + public static Test suite() + { + // Build a new test suite + TestSuite suite = new TestSuite("Ping Performance Tests"); + + // Run performance tests in read committed mode. + suite.addTest(new PingAsyncTestPerf("testAsyncPingOk")); + + return suite; + } + + /** + * Accepts a timing controller from the test runner. + * + * @param timingController The timing controller to register mutliple timings with. + */ + public void setTimingController(TimingController timingController) + { + _timingController = timingController; + } + + /** + * Gets the timing controller passed in by the test runner. + * + * @return The timing controller passed in by the test runner. + */ + public TimingController getTimingController() + { + return _timingController; + } + + /** + * Sends the specified number of pings, asynchronously outputs timings on every batch boundary, and waits until + * all replies have been received or a time out occurs before exiting this method. + * + * @param numPings The number of pings to send. + */ + public void testAsyncPingOk(int numPings) throws Exception + { + _logger.debug("public void testAsyncPingOk(int numPings): called"); + + // Ensure that at least one ping was requeusted. + if (numPings == 0) + { + _logger.error("Number of pings requested was zero."); + } + + // Get the per thread test setup to run the test through. + PerThreadSetup perThreadSetup = threadSetup.get(); + PingClient pingClient = perThreadSetup._pingClient; + + // Advance the correlation id of messages to send, to make it unique for this run. + String messageCorrelationId = Long.toString(corellationIdGenerator.incrementAndGet()); + _logger.debug("messageCorrelationId = " + messageCorrelationId); + + // Initialize the count and timing controller for the new correlation id. + PerCorrelationId perCorrelationId = new PerCorrelationId(); + TimingController tc = getTimingController().getControllerForCurrentThread(); + perCorrelationId._tc = tc; + perCorrelationId._expectedCount = numPings; + perCorrelationIds.put(messageCorrelationId, perCorrelationId); + + // Attach the chained message listener to the ping producer to listen asynchronously for the replies to these + // messages. + pingClient.setChainedMessageListener(batchedResultsListener); + + // Generate a sample message of the specified size. + ObjectMessage msg = + pingClient.getTestMessage(perThreadSetup._pingClient.getReplyDestinations().get(0), + testParameters.getPropertyAsInteger(PingPongProducer.MESSAGE_SIZE_PROPNAME), + testParameters.getPropertyAsBoolean(PingPongProducer.PERSISTENT_MODE_PROPNAME)); + + // Send the requested number of messages, and wait until they have all been received. + long timeout = Long.parseLong(testParameters.getProperty(PingPongProducer.TIMEOUT_PROPNAME)); + int numReplies = pingClient.pingAndWaitForReply(msg, numPings, timeout); + + // Check that all the replies were received and log a fail if they were not. + if (numReplies < numPings) + { + tc.completeTest(false, 0); + } + + // Remove the chained message listener from the ping producer. + pingClient.removeChainedMessageListener(); + + // Remove the expected count and timing controller for the message correlation id, to ensure they are cleaned up. + perCorrelationIds.remove(messageCorrelationId); + } + + /** + * Performs test fixture creation on a per thread basis. This will only be called once for each test thread. + */ + public void threadSetUp() + { + _logger.debug("public void threadSetUp(): called"); + + try + { + // Call the set up method in the super class. This creates a PingClient pinger. + super.threadSetUp(); + + // Create the chained message listener, only if it has not already been created. This is set up with the + // batch size property, to tell it what batch size to output results on. A synchronized block is used to + // ensure that only one thread creates this. + synchronized (this) + { + if (batchedResultsListener == null) + { + int batchSize = Integer.parseInt(testParameters.getProperty(TEST_RESULTS_BATCH_SIZE_PROPNAME)); + batchedResultsListener = new BatchedResultsListener(batchSize); + } + } + + // Get the set up that the super class created. + PerThreadSetup perThreadSetup = threadSetup.get(); + + // Register the chained message listener on the pinger to do its asynchronous test timings from. + perThreadSetup._pingClient.setChainedMessageListener(batchedResultsListener); + } + catch (Exception e) + { + _logger.warn("There was an exception during per thread setup.", e); + } + } + + /** + * BatchedResultsListener is a {@link PingPongProducer.ChainedMessageListener} that can be attached to the + * pinger, in order to receive notifications about every message received and the number remaining to be + * received. Whenever the number remaining crosses a batch size boundary this results listener outputs + * a test timing for the actual number of messages received in the current batch. + */ + private class BatchedResultsListener implements PingPongProducer.ChainedMessageListener + { + /** The test results logging batch size. */ + int _batchSize; + + /** + * Creates a results listener on the specified batch size. + * + * @param batchSize The batch size to use. + */ + public BatchedResultsListener(int batchSize) + { + _batchSize = batchSize; + } + + /** + * This callback method is called from all of the pingers that this test creates. It uses the correlation id + * from the message to identify the timing controller for the test thread that was responsible for sending those + * messages. + * + * @param message The message. + * @param remainingCount The count of messages remaining to be received with a particular correlation id. + * + * @throws JMSException Any underlying JMSException is allowed to fall through. + */ + public void onMessage(Message message, int remainingCount) throws JMSException + { + _logger.debug("public void onMessage(Message message, int remainingCount = " + remainingCount + "): called"); + + // Check if a batch boundary has been crossed. + if ((remainingCount % _batchSize) == 0) + { + // Extract the correlation id from the message. + String correlationId = message.getJMSCorrelationID(); + + // Get the details for the correlation id and check that they are not null. They can become null + // if a test times out. + PerCorrelationId perCorrelationId = perCorrelationIds.get(correlationId); + if (perCorrelationId != null) + { + // Get the timing controller and expected count for this correlation id. + TimingController tc = perCorrelationId._tc; + int expected = perCorrelationId._expectedCount; + + // Calculate how many messages were actually received in the last batch. This will be the batch size + // except where the number expected is not a multiple of the batch size and this is the first remaining + // count to cross a batch size boundary, in which case it will be the number expected modulo the batch + // size. + int receivedInBatch = ((expected - remainingCount) < _batchSize) ? (expected % _batchSize) : _batchSize; + + // Register a test result for the correlation id. + try + { + + tc.completeTest(true, receivedInBatch); + } + catch (InterruptedException e) + { + // Ignore this. It means the test runner wants to stop as soon as possible. + _logger.warn("Got InterruptedException.", e); + } + } + // Else ignore, test timed out. Should log a fail here? + } + } + } + + /** + * Holds state specific to each correlation id, needed to output test results. This consists of the count of + * the total expected number of messages, and the timing controller for the thread sending those message ids. + */ + private static class PerCorrelationId + { + public int _expectedCount; + public TimingController _tc; + } +} diff --git a/java/perftests/src/test/java/org/apache/qpid/ping/PingLatencyTestPerf.java b/java/perftests/src/test/java/org/apache/qpid/ping/PingLatencyTestPerf.java new file mode 100644 index 0000000000..620ddd13f7 --- /dev/null +++ b/java/perftests/src/test/java/org/apache/qpid/ping/PingLatencyTestPerf.java @@ -0,0 +1,317 @@ +/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.ping;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.ObjectMessage;
+
+import junit.framework.Test;
+import junit.framework.TestSuite;
+
+import org.apache.log4j.Logger;
+
+import org.apache.qpid.requestreply.PingPongProducer;
+
+import uk.co.thebadgerset.junit.extensions.TimingController;
+import uk.co.thebadgerset.junit.extensions.TimingControllerAware;
+import uk.co.thebadgerset.junit.extensions.util.ParsedProperties;
+
+/**
+ * PingLatencyTestPerf is a performance test that outputs multiple timings from its test method, using the timing
+ * controller interface supplied by the test runner from a seperate listener thread. It outputs round trip timings for
+ * individual ping messages rather than for how long a complete batch of messages took to process. It also differs from
+ * the {@link PingTestPerf} test that it extends because it can output timings as replies are received, rather than
+ * waiting until all expected replies are received.
+ *
+ * <p/>This test does not output timings for every single ping message, as when running at high volume, writing the test
+ * log for a vast number of messages would slow the testing down. Instead samples ping latency occasionally. The frequency
+ * of ping sampling is set using the {@link #TEST_RESULTS_BATCH_SIZE_PROPNAME} property, to override the default of every
+ * {@link #DEFAULT_TEST_RESULTS_BATCH_SIZE}.
+ *
+ * <p/>The size parameter logged for each individual ping is set to the size of the batch of messages that the individual
+ * timed ping was taken from, rather than 1 for a single message. This is so that the total throughput (messages / time)
+ * can be calculated in order to examine the relationship between throughput and latency.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><td> Responsibilities <th> Collaborations
+ * <tr><td> Send many ping messages and output timings for sampled individual pings.
+ * </table>
+ */
+public class PingLatencyTestPerf extends PingTestPerf implements TimingControllerAware
+{
+ private static Logger _logger = Logger.getLogger(PingLatencyTestPerf.class);
+
+ /** Holds the name of the property to get the test results logging batch size. */
+ public static final String TEST_RESULTS_BATCH_SIZE_PROPNAME = "BatchSize";
+
+ /** Holds the default test results logging batch size. */
+ public static final int DEFAULT_TEST_RESULTS_BATCH_SIZE = 1000;
+
+ /** Used to hold the timing controller passed from the test runner. */
+ private TimingController _timingController;
+
+ /** Used to generate unique correlation ids for each test run. */
+ private AtomicLong corellationIdGenerator = new AtomicLong();
+
+ /** Holds test specifics by correlation id. This consists of the expected number of messages and the timing controler. */
+ private Map<String, PerCorrelationId> perCorrelationIds =
+ Collections.synchronizedMap(new HashMap<String, PerCorrelationId>());
+
+ /** Holds the batched results listener, that does logging on batch boundaries. */
+ private BatchedResultsListener batchedResultsListener = null;
+
+ /**
+ * Creates a new asynchronous ping performance test with the specified name.
+ *
+ * @param name The test name.
+ */
+ public PingLatencyTestPerf(String name)
+ {
+ super(name);
+
+ // Sets up the test parameters with defaults.
+ ParsedProperties.setSysPropertyIfNull(TEST_RESULTS_BATCH_SIZE_PROPNAME,
+ Integer.toString(DEFAULT_TEST_RESULTS_BATCH_SIZE));
+ }
+
+ /**
+ * Compile all the tests into a test suite.
+ */
+ public static Test suite()
+ {
+ // Build a new test suite
+ TestSuite suite = new TestSuite("Ping Latency Tests");
+
+ // Run performance tests in read committed mode.
+ suite.addTest(new PingLatencyTestPerf("testPingLatency"));
+
+ return suite;
+ }
+
+ /**
+ * Accepts a timing controller from the test runner.
+ *
+ * @param timingController The timing controller to register mutliple timings with.
+ */
+ public void setTimingController(TimingController timingController)
+ {
+ _timingController = timingController;
+ }
+
+ /**
+ * Gets the timing controller passed in by the test runner.
+ *
+ * @return The timing controller passed in by the test runner.
+ */
+ public TimingController getTimingController()
+ {
+ return _timingController;
+ }
+
+ /**
+ * Sends the specified number of pings, asynchronously outputs timings on every batch boundary, and waits until
+ * all replies have been received or a time out occurs before exiting this method.
+ *
+ * @param numPings The number of pings to send.
+ */
+ public void testPingLatency(int numPings) throws Exception
+ {
+ _logger.debug("public void testPingLatency(int numPings): called");
+
+ // Ensure that at least one ping was requeusted.
+ if (numPings == 0)
+ {
+ _logger.error("Number of pings requested was zero.");
+ }
+
+ // Get the per thread test setup to run the test through.
+ PerThreadSetup perThreadSetup = threadSetup.get();
+ PingClient pingClient = perThreadSetup._pingClient;
+
+ // Advance the correlation id of messages to send, to make it unique for this run.
+ String messageCorrelationId = Long.toString(corellationIdGenerator.incrementAndGet());
+ _logger.debug("messageCorrelationId = " + messageCorrelationId);
+
+ // Initialize the count and timing controller for the new correlation id.
+ PerCorrelationId perCorrelationId = new PerCorrelationId();
+ TimingController tc = getTimingController().getControllerForCurrentThread();
+ perCorrelationId._tc = tc;
+ perCorrelationId._expectedCount = numPings;
+ perCorrelationIds.put(messageCorrelationId, perCorrelationId);
+
+ // Attach the chained message listener to the ping producer to listen asynchronously for the replies to these
+ // messages.
+ pingClient.setChainedMessageListener(batchedResultsListener);
+
+ // Generate a sample message of the specified size.
+ ObjectMessage msg =
+ pingClient.getTestMessage(perThreadSetup._pingClient.getReplyDestinations().get(0),
+ testParameters.getPropertyAsInteger(PingPongProducer.MESSAGE_SIZE_PROPNAME),
+ testParameters.getPropertyAsBoolean(PingPongProducer.PERSISTENT_MODE_PROPNAME));
+
+ // Send the requested number of messages, and wait until they have all been received.
+ long timeout = Long.parseLong(testParameters.getProperty(PingPongProducer.TIMEOUT_PROPNAME));
+ int numReplies = pingClient.pingAndWaitForReply(msg, numPings, timeout);
+
+ // Check that all the replies were received and log a fail if they were not.
+ if (numReplies < numPings)
+ {
+ tc.completeTest(false, 0);
+ }
+
+ // Remove the chained message listener from the ping producer.
+ pingClient.removeChainedMessageListener();
+
+ // Remove the expected count and timing controller for the message correlation id, to ensure they are cleaned up.
+ perCorrelationIds.remove(messageCorrelationId);
+ }
+
+ /**
+ * Performs test fixture creation on a per thread basis. This will only be called once for each test thread.
+ */
+ public void threadSetUp()
+ {
+ _logger.debug("public void threadSetUp(): called");
+
+ try
+ {
+ // Call the set up method in the super class. This creates a PingClient pinger.
+ super.threadSetUp();
+
+ // Create the chained message listener, only if it has not already been created. This is set up with the
+ // batch size property, to tell it what batch size to output results on. A synchronized block is used to
+ // ensure that only one thread creates this.
+ synchronized (this)
+ {
+ if (batchedResultsListener == null)
+ {
+ int batchSize = Integer.parseInt(testParameters.getProperty(TEST_RESULTS_BATCH_SIZE_PROPNAME));
+ batchedResultsListener = new BatchedResultsListener(batchSize);
+ }
+ }
+
+ // Get the set up that the super class created.
+ PerThreadSetup perThreadSetup = threadSetup.get();
+
+ // Register the chained message listener on the pinger to do its asynchronous test timings from.
+ perThreadSetup._pingClient.setChainedMessageListener(batchedResultsListener);
+ }
+ catch (Exception e)
+ {
+ _logger.warn("There was an exception during per thread setup.", e);
+ }
+ }
+
+ /**
+ * BatchedResultsListener is a {@link org.apache.qpid.requestreply.PingPongProducer.ChainedMessageListener} that can
+ * be attached to the pinger, in order to receive notifications about every message received and the number remaining
+ * to be received. Whenever the number remaining crosses a batch size boundary this results listener outputs a test
+ * timing for the actual number of messages received in the current batch.
+ */
+ private class BatchedResultsListener implements PingPongProducer.ChainedMessageListener
+ {
+ /** The test results logging batch size. */
+ int _batchSize;
+
+ /**
+ * Creates a results listener on the specified batch size.
+ *
+ * @param batchSize The batch size to use.
+ */
+ public BatchedResultsListener(int batchSize)
+ {
+ _batchSize = batchSize;
+ }
+
+ /**
+ * This callback method is called from all of the pingers that this test creates. It uses the correlation id
+ * from the message to identify the timing controller for the test thread that was responsible for sending those
+ * messages.
+ *
+ * @param message The message.
+ * @param remainingCount The count of messages remaining to be received with a particular correlation id.
+ *
+ * @throws javax.jms.JMSException Any underlying JMSException is allowed to fall through.
+ */
+ public void onMessage(Message message, int remainingCount) throws JMSException
+ {
+ _logger.debug("public void onMessage(Message message, int remainingCount = " + remainingCount + "): called");
+
+ // Check if a batch boundary has been crossed.
+ if ((remainingCount % _batchSize) == 0)
+ {
+ // Extract the correlation id from the message.
+ String correlationId = message.getJMSCorrelationID();
+
+ // Get the details for the correlation id and check that they are not null. They can become null
+ // if a test times out.
+ PerCorrelationId perCorrelationId = perCorrelationIds.get(correlationId);
+ if (perCorrelationId != null)
+ {
+ // Get the timing controller and expected count for this correlation id.
+ TimingController tc = perCorrelationId._tc;
+ int expected = perCorrelationId._expectedCount;
+
+ // Extract the send time from the message and work out from the current time, what the ping latency was.
+ // The ping producer time stamps messages in nanoseconds.
+ long startTime = message.getLongProperty(PingPongProducer.MESSAGE_TIMESTAMP_PROPNAME);
+ long now = System.nanoTime();
+ long pingTime = now - startTime;
+
+ // Calculate how many messages were actually received in the last batch. This will be the batch size
+ // except where the number expected is not a multiple of the batch size and this is the first remaining
+ // count to cross a batch size boundary, in which case it will be the number expected modulo the batch
+ // size.
+ int receivedInBatch = ((expected - remainingCount) < _batchSize) ? (expected % _batchSize) : _batchSize;
+
+ // Register a test result for the correlation id.
+ try
+ {
+
+ tc.completeTest(true, receivedInBatch, pingTime);
+ }
+ catch (InterruptedException e)
+ {
+ // Ignore this. It means the test runner wants to stop as soon as possible.
+ _logger.warn("Got InterruptedException.", e);
+ }
+ }
+ // Else ignore, test timed out. Should log a fail here?
+ }
+ }
+ }
+
+ /**
+ * Holds state specific to each correlation id, needed to output test results. This consists of the count of
+ * the total expected number of messages, and the timing controller for the thread sending those message ids.
+ */
+ private static class PerCorrelationId
+ {
+ public int _expectedCount;
+ public TimingController _tc;
+ }
+}
diff --git a/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java b/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java new file mode 100644 index 0000000000..c4e72f4bb6 --- /dev/null +++ b/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java @@ -0,0 +1,246 @@ +/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.ping;
+
+import javax.jms.*;
+
+import junit.framework.Assert;
+import junit.framework.Test;
+import junit.framework.TestSuite;
+
+import org.apache.log4j.Logger;
+
+import org.apache.qpid.requestreply.PingPongProducer;
+
+import uk.co.thebadgerset.junit.extensions.AsymptoticTestCase;
+import uk.co.thebadgerset.junit.extensions.TestThreadAware;
+import uk.co.thebadgerset.junit.extensions.util.ParsedProperties;
+
+/**
+ * PingTestPerf is a ping test, that has been written with the intention of being scaled up to run many times
+ * simultaneously to simluate many clients/producers/connections.
+ *
+ * <p/>A single run of the test using the default JUnit test runner will result in the sending and timing of a single
+ * full round trip ping. This test may be scaled up using a suitable JUnit test runner.
+ *
+ * <p/>The setup/teardown cycle establishes a connection to a broker and sets up a queue to send ping messages to and a
+ * temporary queue for replies. This setup is only established once for all the test repeats/threads that may be run,
+ * except if the connection is lost in which case an attempt to re-establish the setup is made.
+ *
+ * <p/>The test cycle is: Connects to a queue, creates a temporary queue, creates messages containing a property that
+ * is the name of the temporary queue, fires off a message on the original queue and waits for a response on the
+ * temporary queue.
+ *
+ * <p/>Configurable test properties: message size, transacted or not, persistent or not. Broker connection details.
+ *
+ * <p><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * </table>
+ *
+ * @author Rupert Smith
+ */
+public class PingTestPerf extends AsymptoticTestCase implements TestThreadAware
+{
+ private static Logger _logger = Logger.getLogger(PingTestPerf.class);
+
+ /** Thread local to hold the per-thread test setup fields. */
+ ThreadLocal<PerThreadSetup> threadSetup = new ThreadLocal<PerThreadSetup>();
+
+ /** Holds a property reader to extract the test parameters from. */
+ protected ParsedProperties testParameters = new ParsedProperties(System.getProperties());
+
+ public PingTestPerf(String name)
+ {
+ super(name);
+
+ // Sets up the test parameters with defaults.
+ ParsedProperties.setSysPropertyIfNull(PingPongProducer.COMMIT_BATCH_SIZE_PROPNAME,
+ Integer.toString(PingPongProducer.DEFAULT_TX_BATCH_SIZE));
+ ParsedProperties.setSysPropertyIfNull(PingPongProducer.MESSAGE_SIZE_PROPNAME,
+ Integer.toString(PingPongProducer.DEFAULT_MESSAGE_SIZE));
+ ParsedProperties.setSysPropertyIfNull(PingPongProducer.PING_QUEUE_NAME_PROPNAME,
+ PingPongProducer.DEFAULT_PING_DESTINATION_NAME);
+ ParsedProperties.setSysPropertyIfNull(PingPongProducer.PERSISTENT_MODE_PROPNAME,
+ Boolean.toString(PingPongProducer.DEFAULT_PERSISTENT_MODE));
+ ParsedProperties.setSysPropertyIfNull(PingPongProducer.TRANSACTED_PROPNAME,
+ Boolean.toString(PingPongProducer.DEFAULT_TRANSACTED));
+ ParsedProperties.setSysPropertyIfNull(PingPongProducer.BROKER_PROPNAME, PingPongProducer.DEFAULT_BROKER);
+ ParsedProperties.setSysPropertyIfNull(PingPongProducer.USERNAME_PROPNAME, PingPongProducer.DEFAULT_USERNAME);
+ ParsedProperties.setSysPropertyIfNull(PingPongProducer.PASSWORD_PROPNAME, PingPongProducer.DEFAULT_PASSWORD);
+ ParsedProperties.setSysPropertyIfNull(PingPongProducer.VIRTUAL_PATH_PROPNAME, PingPongProducer.DEFAULT_VIRTUAL_PATH);
+ ParsedProperties.setSysPropertyIfNull(PingPongProducer.VERBOSE_OUTPUT_PROPNAME,
+ Boolean.toString(PingPongProducer.DEFAULT_VERBOSE));
+ ParsedProperties.setSysPropertyIfNull(PingPongProducer.RATE_PROPNAME,
+ Integer.toString(PingPongProducer.DEFAULT_RATE));
+ ParsedProperties.setSysPropertyIfNull(PingPongProducer.IS_PUBSUB_PROPNAME,
+ Boolean.toString(PingPongProducer.DEFAULT_PUBSUB));
+ ParsedProperties.setSysPropertyIfNull(PingPongProducer.COMMIT_BATCH_SIZE_PROPNAME,
+ Integer.toString(PingPongProducer.DEFAULT_TX_BATCH_SIZE));
+ ParsedProperties.setSysPropertyIfNull(PingPongProducer.TIMEOUT_PROPNAME,
+ Long.toString(PingPongProducer.DEFAULT_TIMEOUT));
+ ParsedProperties.setSysPropertyIfNull(PingPongProducer.PING_DESTINATION_COUNT_PROPNAME,
+ Integer.toString(PingPongProducer.DEFAULT_DESTINATION_COUNT));
+ ParsedProperties.setSysPropertyIfNull(PingPongProducer.FAIL_AFTER_COMMIT_PROPNAME,
+ PingPongProducer.DEFAULT_FAIL_AFTER_COMMIT);
+ ParsedProperties.setSysPropertyIfNull(PingPongProducer.FAIL_BEFORE_COMMIT_PROPNAME,
+ PingPongProducer.DEFAULT_FAIL_BEFORE_COMMIT);
+ ParsedProperties.setSysPropertyIfNull(PingPongProducer.FAIL_AFTER_SEND_PROPNAME,
+ PingPongProducer.DEFAULT_FAIL_AFTER_SEND);
+ ParsedProperties.setSysPropertyIfNull(PingPongProducer.FAIL_BEFORE_SEND_PROPNAME,
+ PingPongProducer.DEFAULT_FAIL_BEFORE_SEND);
+ ParsedProperties.setSysPropertyIfNull(PingPongProducer.FAIL_ONCE_PROPNAME, PingPongProducer.DEFAULT_FAIL_ONCE);
+ }
+
+ /**
+ * Compile all the tests into a test suite.
+ */
+ public static Test suite()
+ {
+ // Build a new test suite
+ TestSuite suite = new TestSuite("Ping Performance Tests");
+
+ // Run performance tests in read committed mode.
+ suite.addTest(new PingTestPerf("testPingOk"));
+
+ return suite;
+ }
+
+ public void testPingOk(int numPings) throws Exception
+ {
+ // Get the per thread test setup to run the test through.
+ PerThreadSetup perThreadSetup = threadSetup.get();
+ if (numPings == 0)
+ {
+ _logger.error("Number of pings requested was zero.");
+ }
+
+ // Generate a sample message. This message is already time stamped and has its reply-to destination set.
+ ObjectMessage msg =
+ perThreadSetup._pingClient.getTestMessage(perThreadSetup._pingClient.getReplyDestinations().get(0),
+ testParameters.getPropertyAsInteger(
+ PingPongProducer.MESSAGE_SIZE_PROPNAME),
+ testParameters.getPropertyAsBoolean(
+ PingPongProducer.PERSISTENT_MODE_PROPNAME));
+
+ // start the test
+ long timeout = Long.parseLong(testParameters.getProperty(PingPongProducer.TIMEOUT_PROPNAME));
+ int numReplies = perThreadSetup._pingClient.pingAndWaitForReply(msg, numPings, timeout);
+
+ // Fail the test if the timeout was exceeded.
+ if (numReplies != numPings)
+ {
+ Assert.fail("The ping timed out after " + timeout + " ms. Messages Sent = " + numPings + ", MessagesReceived = "
+ + numReplies);
+ }
+ }
+
+ /**
+ * Performs test fixture creation on a per thread basis. This will only be called once for each test thread.
+ */
+ public void threadSetUp()
+ {
+ _logger.debug("public void threadSetUp(): called");
+
+ try
+ {
+ PerThreadSetup perThreadSetup = new PerThreadSetup();
+
+ // Extract the test set up paramaeters.
+ String brokerDetails = testParameters.getProperty(PingPongProducer.BROKER_PROPNAME);
+ String username = testParameters.getProperty(PingPongProducer.USERNAME_PROPNAME);
+ String password = testParameters.getProperty(PingPongProducer.PASSWORD_PROPNAME);
+ String virtualPath = testParameters.getProperty(PingPongProducer.VIRTUAL_PATH_PROPNAME);
+ String destinationName = testParameters.getProperty(PingPongProducer.PING_QUEUE_NAME_PROPNAME);
+ boolean persistent = testParameters.getPropertyAsBoolean(PingPongProducer.PERSISTENT_MODE_PROPNAME);
+ boolean transacted = testParameters.getPropertyAsBoolean(PingPongProducer.TRANSACTED_PROPNAME);
+ String selector = testParameters.getProperty(PingPongProducer.SELECTOR_PROPNAME);
+ boolean verbose = testParameters.getPropertyAsBoolean(PingPongProducer.VERBOSE_OUTPUT_PROPNAME);
+ int messageSize = testParameters.getPropertyAsInteger(PingPongProducer.MESSAGE_SIZE_PROPNAME);
+ int rate = testParameters.getPropertyAsInteger(PingPongProducer.RATE_PROPNAME);
+ boolean pubsub = testParameters.getPropertyAsBoolean(PingPongProducer.IS_PUBSUB_PROPNAME);
+ boolean failAfterCommit = testParameters.getPropertyAsBoolean(PingPongProducer.FAIL_AFTER_COMMIT_PROPNAME);
+ boolean failBeforeCommit = testParameters.getPropertyAsBoolean(PingPongProducer.FAIL_BEFORE_COMMIT_PROPNAME);
+ boolean failAfterSend = testParameters.getPropertyAsBoolean(PingPongProducer.FAIL_AFTER_SEND_PROPNAME);
+ boolean failBeforeSend = testParameters.getPropertyAsBoolean(PingPongProducer.FAIL_BEFORE_SEND_PROPNAME);
+ int batchSize = testParameters.getPropertyAsInteger(PingPongProducer.COMMIT_BATCH_SIZE_PROPNAME);
+ Boolean failOnce = testParameters.getPropertyAsBoolean(PingPongProducer.FAIL_ONCE_PROPNAME);
+
+ // Extract the test set up paramaeters.
+ int destinationscount =
+ Integer.parseInt(testParameters.getProperty(PingPongProducer.PING_DESTINATION_COUNT_PROPNAME));
+
+ // This is synchronized because there is a race condition, which causes one connection to sleep if
+ // all threads try to create connection concurrently.
+ synchronized (this)
+ {
+ // Establish a client to ping a Destination and listen the reply back from same Destination
+ perThreadSetup._pingClient = new PingClient(brokerDetails, username, password, virtualPath, destinationName,
+ selector, transacted, persistent, messageSize, verbose,
+ failAfterCommit, failBeforeCommit, failAfterSend, failBeforeSend,
+ failOnce, batchSize, destinationscount, rate, pubsub);
+ }
+ // Start the client connection
+ perThreadSetup._pingClient.getConnection().start();
+
+ // Attach the per-thread set to the thread.
+ threadSetup.set(perThreadSetup);
+ }
+ catch (Exception e)
+ {
+ _logger.warn("There was an exception during per thread setup.", e);
+ }
+ }
+
+ /**
+ * Performs test fixture clean
+ */
+ public void threadTearDown()
+ {
+ _logger.debug("public void threadTearDown(): called");
+
+ try
+ {
+ // Get the per thread test fixture.
+ PerThreadSetup perThreadSetup = threadSetup.get();
+
+ // Close the pingers so that it cleans up its connection cleanly.
+ synchronized (this)
+ {
+ perThreadSetup._pingClient.close();
+ }
+
+ // Ensure the per thread fixture is reclaimed.
+ threadSetup.remove();
+ }
+ catch (JMSException e)
+ {
+ _logger.warn("There was an exception during per thread tear down.");
+ }
+ }
+
+ protected static class PerThreadSetup
+ {
+ /**
+ * Holds the test ping client.
+ */
+ protected PingClient _pingClient;
+ }
+}
diff --git a/java/perftests/src/test/java/org/apache/qpid/requestreply/PingPongTestPerf.java b/java/perftests/src/test/java/org/apache/qpid/requestreply/PingPongTestPerf.java new file mode 100644 index 0000000000..81967d332a --- /dev/null +++ b/java/perftests/src/test/java/org/apache/qpid/requestreply/PingPongTestPerf.java @@ -0,0 +1,259 @@ +/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.requestreply;
+
+import javax.jms.*;
+
+import junit.framework.Assert;
+import junit.framework.Test;
+import junit.framework.TestSuite;
+
+import org.apache.log4j.Logger;
+
+import uk.co.thebadgerset.junit.extensions.AsymptoticTestCase;
+import uk.co.thebadgerset.junit.extensions.util.ParsedProperties;
+
+/**
+ * PingPongTestPerf is a full round trip ping test, that has been written with the intention of being scaled up to run
+ * many times simultaneously to simluate many clients/producer/connections. A full round trip ping sends a message from
+ * a producer to a conumer, then the consumer replies to the message on a temporary queue.
+ *
+ * <p/>A single run of the test using the default JUnit test runner will result in the sending and timing of the number
+ * of pings specified by the test size and time how long it takes for all of these to complete. This test may be scaled
+ * up using a suitable JUnit test runner. See {@link uk.co.thebadgerset.junit.extensions.TKTestRunner} for more
+ * information on how to do this.
+ *
+ * <p/>The setup/teardown cycle establishes a connection to a broker and sets up a queue to send ping messages to and a
+ * temporary queue for replies. This setup is only established once for all the test repeats, but each test threads
+ * gets its own connection/producer/consumer, this is only re-established if the connection is lost.
+ *
+ * <p/>The test cycle is: Connects to a queue, creates a temporary queue, creates messages containing a property that
+ * is the name of the temporary queue, fires off many messages on the original queue and waits for them all to come
+ * back on the temporary queue.
+ *
+ * <p/>Configurable test properties: message size, transacted or not, persistent or not. Broker connection details.
+ *
+ * <p><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * </table>
+ *
+ * @author Rupert Smith
+ */
+public class PingPongTestPerf extends AsymptoticTestCase
+{
+ private static Logger _logger = Logger.getLogger(PingPongTestPerf.class);
+
+ /** Thread local to hold the per-thread test setup fields. */
+ ThreadLocal<PerThreadSetup> threadSetup = new ThreadLocal<PerThreadSetup>();
+
+ // Set up a property reader to extract the test parameters from. Once ContextualProperties is available in
+ // the project dependencies, use it to get property overrides for configurable tests and to notify the test runner
+ // of the test parameters to log with the results. It also providers some basic type parsing convenience methods.
+ //private Properties testParameters = System.getProperties();
+ private ParsedProperties testParameters = new ParsedProperties(System.getProperties());
+
+ public PingPongTestPerf(String name)
+ {
+ super(name);
+
+ // Sets up the test parameters with defaults.
+ ParsedProperties.setSysPropertyIfNull(PingPongProducer.COMMIT_BATCH_SIZE_PROPNAME,
+ Integer.toString(PingPongProducer.DEFAULT_TX_BATCH_SIZE));
+ ParsedProperties.setSysPropertyIfNull(PingPongProducer.MESSAGE_SIZE_PROPNAME,
+ Integer.toString(PingPongProducer.DEFAULT_MESSAGE_SIZE));
+ ParsedProperties.setSysPropertyIfNull(PingPongProducer.PING_QUEUE_NAME_PROPNAME,
+ PingPongProducer.DEFAULT_PING_DESTINATION_NAME);
+ ParsedProperties.setSysPropertyIfNull(PingPongProducer.PERSISTENT_MODE_PROPNAME,
+ Boolean.toString(PingPongProducer.DEFAULT_PERSISTENT_MODE));
+ ParsedProperties.setSysPropertyIfNull(PingPongProducer.TRANSACTED_PROPNAME,
+ Boolean.toString(PingPongProducer.DEFAULT_TRANSACTED));
+ ParsedProperties.setSysPropertyIfNull(PingPongProducer.BROKER_PROPNAME, PingPongProducer.DEFAULT_BROKER);
+ ParsedProperties.setSysPropertyIfNull(PingPongProducer.USERNAME_PROPNAME, PingPongProducer.DEFAULT_USERNAME);
+ ParsedProperties.setSysPropertyIfNull(PingPongProducer.PASSWORD_PROPNAME, PingPongProducer.DEFAULT_PASSWORD);
+ ParsedProperties.setSysPropertyIfNull(PingPongProducer.VIRTUAL_PATH_PROPNAME, PingPongProducer.DEFAULT_VIRTUAL_PATH);
+ ParsedProperties.setSysPropertyIfNull(PingPongProducer.VERBOSE_OUTPUT_PROPNAME,
+ Boolean.toString(PingPongProducer.DEFAULT_VERBOSE));
+ ParsedProperties.setSysPropertyIfNull(PingPongProducer.RATE_PROPNAME,
+ Integer.toString(PingPongProducer.DEFAULT_RATE));
+ ParsedProperties.setSysPropertyIfNull(PingPongProducer.IS_PUBSUB_PROPNAME,
+ Boolean.toString(PingPongProducer.DEFAULT_PUBSUB));
+ ParsedProperties.setSysPropertyIfNull(PingPongProducer.COMMIT_BATCH_SIZE_PROPNAME,
+ Integer.toString(PingPongProducer.DEFAULT_TX_BATCH_SIZE));
+ ParsedProperties.setSysPropertyIfNull(PingPongProducer.TIMEOUT_PROPNAME,
+ Long.toString(PingPongProducer.DEFAULT_TIMEOUT));
+ ParsedProperties.setSysPropertyIfNull(PingPongProducer.PING_DESTINATION_COUNT_PROPNAME,
+ Integer.toString(PingPongProducer.DEFAULT_DESTINATION_COUNT));
+ ParsedProperties.setSysPropertyIfNull(PingPongProducer.FAIL_AFTER_COMMIT_PROPNAME,
+ PingPongProducer.DEFAULT_FAIL_AFTER_COMMIT);
+ ParsedProperties.setSysPropertyIfNull(PingPongProducer.FAIL_BEFORE_COMMIT_PROPNAME,
+ PingPongProducer.DEFAULT_FAIL_BEFORE_COMMIT);
+ ParsedProperties.setSysPropertyIfNull(PingPongProducer.FAIL_AFTER_SEND_PROPNAME,
+ PingPongProducer.DEFAULT_FAIL_AFTER_SEND);
+ ParsedProperties.setSysPropertyIfNull(PingPongProducer.FAIL_BEFORE_SEND_PROPNAME,
+ PingPongProducer.DEFAULT_FAIL_BEFORE_SEND);
+ ParsedProperties.setSysPropertyIfNull(PingPongProducer.FAIL_ONCE_PROPNAME, PingPongProducer.DEFAULT_FAIL_ONCE);
+ }
+
+ /**
+ * Compile all the tests into a test suite.
+ */
+ public static Test suite()
+ {
+ // Build a new test suite
+ TestSuite suite = new TestSuite("Ping-Pong Performance Tests");
+
+ // Run performance tests in read committed mode.
+ suite.addTest(new PingPongTestPerf("testPingPongOk"));
+
+ return suite;
+ }
+
+ private static void setSystemPropertyIfNull(String propName, String propValue)
+ {
+ if (System.getProperty(propName) == null)
+ {
+ System.setProperty(propName, propValue);
+ }
+ }
+
+ public void testPingPongOk(int numPings) throws Exception
+ {
+ // Get the per thread test setup to run the test through.
+ PerThreadSetup perThreadSetup = threadSetup.get();
+
+ // Generate a sample message. This message is already time stamped and has its reply-to destination set.
+ ObjectMessage msg =
+ perThreadSetup._testPingProducer.getTestMessage(perThreadSetup._testPingProducer.getReplyDestinations().get(0),
+ testParameters.getPropertyAsInteger(
+ PingPongProducer.MESSAGE_SIZE_PROPNAME),
+ testParameters.getPropertyAsBoolean(
+ PingPongProducer.PERSISTENT_MODE_PROPNAME));
+
+ // Send the message and wait for a reply.
+ int numReplies =
+ perThreadSetup._testPingProducer.pingAndWaitForReply(msg, numPings, PingPongProducer.DEFAULT_TIMEOUT);
+
+ // Fail the test if the timeout was exceeded.
+ if (numReplies != numPings)
+ {
+ Assert.fail("The ping timed out, got " + numReplies + " out of " + numPings);
+ }
+ }
+
+ /**
+ * Performs test fixture creation on a per thread basis. This will only be called once for each test thread.
+ */
+ public void threadSetUp()
+ {
+ try
+ {
+ PerThreadSetup perThreadSetup = new PerThreadSetup();
+
+ // Extract the test set up paramaeters.
+ String brokerDetails = testParameters.getProperty(PingPongProducer.BROKER_PROPNAME);
+ String username = testParameters.getProperty(PingPongProducer.USERNAME_PROPNAME);
+ String password = testParameters.getProperty(PingPongProducer.PASSWORD_PROPNAME);
+ String virtualPath = testParameters.getProperty(PingPongProducer.VIRTUAL_PATH_PROPNAME);
+ String destinationName = testParameters.getProperty(PingPongProducer.PING_QUEUE_NAME_PROPNAME);
+ boolean persistent = testParameters.getPropertyAsBoolean(PingPongProducer.PERSISTENT_MODE_PROPNAME);
+ boolean transacted = testParameters.getPropertyAsBoolean(PingPongProducer.TRANSACTED_PROPNAME);
+ String selector = testParameters.getProperty(PingPongProducer.SELECTOR_PROPNAME);
+ boolean verbose = testParameters.getPropertyAsBoolean(PingPongProducer.VERBOSE_OUTPUT_PROPNAME);
+ int messageSize = testParameters.getPropertyAsInteger(PingPongProducer.MESSAGE_SIZE_PROPNAME);
+ int rate = testParameters.getPropertyAsInteger(PingPongProducer.RATE_PROPNAME);
+ boolean pubsub = testParameters.getPropertyAsBoolean(PingPongProducer.IS_PUBSUB_PROPNAME);
+ boolean failAfterCommit = testParameters.getPropertyAsBoolean(PingPongProducer.FAIL_AFTER_COMMIT_PROPNAME);
+ boolean failBeforeCommit = testParameters.getPropertyAsBoolean(PingPongProducer.FAIL_BEFORE_COMMIT_PROPNAME);
+ boolean failAfterSend = testParameters.getPropertyAsBoolean(PingPongProducer.FAIL_AFTER_SEND_PROPNAME);
+ boolean failBeforeSend = testParameters.getPropertyAsBoolean(PingPongProducer.FAIL_BEFORE_SEND_PROPNAME);
+ int batchSize = testParameters.getPropertyAsInteger(PingPongProducer.COMMIT_BATCH_SIZE_PROPNAME);
+ Boolean failOnce = testParameters.getPropertyAsBoolean(PingPongProducer.FAIL_ONCE_PROPNAME);
+
+ synchronized (this)
+ {
+ // Establish a bounce back client on the ping queue to bounce back the pings.
+ perThreadSetup._testPingBouncer = new PingPongBouncer(brokerDetails, username, password, virtualPath,
+ destinationName, persistent, transacted, selector,
+ verbose, pubsub);
+
+ // Start the connections for client and producer running.
+ perThreadSetup._testPingBouncer.getConnection().start();
+
+ // Establish a ping-pong client on the ping queue to send the pings with.
+
+ perThreadSetup._testPingProducer = new PingPongProducer(brokerDetails, username, password, virtualPath,
+ destinationName, selector, transacted, persistent,
+ messageSize, verbose, failAfterCommit,
+ failBeforeCommit, failAfterSend, failBeforeSend,
+ failOnce, batchSize, 0, rate, pubsub);
+ perThreadSetup._testPingProducer.getConnection().start();
+ }
+
+ // Attach the per-thread set to the thread.
+ threadSetup.set(perThreadSetup);
+ }
+ catch (Exception e)
+ {
+ _logger.warn("There was an exception during per thread setup.", e);
+ }
+ }
+
+ /**
+ * Performs test fixture clean
+ */
+ public void threadTearDown()
+ {
+ _logger.debug("public void threadTearDown(): called");
+
+ try
+ {
+ // Get the per thread test fixture.
+ PerThreadSetup perThreadSetup = threadSetup.get();
+
+ // Close the pingers so that it cleans up its connection cleanly.
+ synchronized (this)
+ {
+ perThreadSetup._testPingProducer.close();
+ //perThreadSetup._testPingBouncer.close();
+ }
+
+ // Ensure the per thread fixture is reclaimed.
+ threadSetup.remove();
+ }
+ catch (JMSException e)
+ {
+ _logger.warn("There was an exception during per thread tear down.");
+ }
+ }
+
+ protected static class PerThreadSetup
+ {
+ /**
+ * Holds the test ping-pong producer.
+ */
+ private PingPongProducer _testPingProducer;
+
+ /**
+ * Holds the test ping client.
+ */
+ private PingPongBouncer _testPingBouncer;
+ }
+}
|
