From 633c33f224f3196f3f9bd80bd2e418d8143fea06 Mon Sep 17 00:00:00 2001 From: Kim van der Riet Date: Fri, 4 May 2012 15:39:19 +0000 Subject: QPID-3858: Updated branch - merged from trunk r.1333987 git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1334037 13f79535-47bb-0310-9956-ffa450edef68 --- java/tools/bin/controller | 132 ------ java/tools/bin/jms-quick-perf-report | 137 +++++++ java/tools/bin/mercury-controller | 132 ++++++ java/tools/bin/mercury-start-consumers | 119 ++++++ java/tools/bin/mercury-start-producers | 136 +++++++ java/tools/bin/perf-report | 137 ------- java/tools/bin/start-consumers | 119 ------ java/tools/bin/start-producers | 136 ------- .../src/main/java/org/apache/qpid/tools/Clock.java | 2 + .../org/apache/qpid/tools/JVMArgConfiguration.java | 411 +++++++++++++++++++ .../java/org/apache/qpid/tools/LatencyTest.java | 349 ---------------- .../java/org/apache/qpid/tools/MercuryBase.java | 191 +++++++++ .../qpid/tools/MercuryConsumerController.java | 231 +++++++++++ .../qpid/tools/MercuryProducerController.java | 210 ++++++++++ .../apache/qpid/tools/MercuryTestController.java | 450 +++++++++++++++++++++ .../main/java/org/apache/qpid/tools/PerfBase.java | 226 ----------- .../java/org/apache/qpid/tools/PerfConsumer.java | 325 --------------- .../java/org/apache/qpid/tools/PerfProducer.java | 358 ---------------- .../org/apache/qpid/tools/PerfTestController.java | 442 -------------------- .../java/org/apache/qpid/tools/QpidReceive.java | 181 +++++++++ .../main/java/org/apache/qpid/tools/QpidSend.java | 291 +++++++++++++ .../org/apache/qpid/tools/TestConfiguration.java | 126 ++++++ .../java/org/apache/qpid/tools/TestParams.java | 214 ---------- .../apache/qpid/tools/report/BasicReporter.java | 113 ++++++ .../apache/qpid/tools/report/MercuryReporter.java | 167 ++++++++ .../org/apache/qpid/tools/report/Reporter.java | 40 ++ .../org/apache/qpid/tools/report/Statistics.java | 139 +++++++ 27 files changed, 3076 insertions(+), 2438 deletions(-) delete mode 100644 java/tools/bin/controller create mode 100755 java/tools/bin/jms-quick-perf-report create mode 100644 java/tools/bin/mercury-controller create mode 100644 java/tools/bin/mercury-start-consumers create mode 100644 java/tools/bin/mercury-start-producers delete mode 100755 java/tools/bin/perf-report delete mode 100644 java/tools/bin/start-consumers delete mode 100644 java/tools/bin/start-producers create mode 100644 java/tools/src/main/java/org/apache/qpid/tools/JVMArgConfiguration.java delete mode 100644 java/tools/src/main/java/org/apache/qpid/tools/LatencyTest.java create mode 100644 java/tools/src/main/java/org/apache/qpid/tools/MercuryBase.java create mode 100644 java/tools/src/main/java/org/apache/qpid/tools/MercuryConsumerController.java create mode 100644 java/tools/src/main/java/org/apache/qpid/tools/MercuryProducerController.java create mode 100644 java/tools/src/main/java/org/apache/qpid/tools/MercuryTestController.java delete mode 100644 java/tools/src/main/java/org/apache/qpid/tools/PerfBase.java delete mode 100644 java/tools/src/main/java/org/apache/qpid/tools/PerfConsumer.java delete mode 100644 java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java delete mode 100644 java/tools/src/main/java/org/apache/qpid/tools/PerfTestController.java create mode 100644 java/tools/src/main/java/org/apache/qpid/tools/QpidReceive.java create mode 100644 java/tools/src/main/java/org/apache/qpid/tools/QpidSend.java create mode 100644 java/tools/src/main/java/org/apache/qpid/tools/TestConfiguration.java delete mode 100644 java/tools/src/main/java/org/apache/qpid/tools/TestParams.java create mode 100644 java/tools/src/main/java/org/apache/qpid/tools/report/BasicReporter.java create mode 100644 java/tools/src/main/java/org/apache/qpid/tools/report/MercuryReporter.java create mode 100644 java/tools/src/main/java/org/apache/qpid/tools/report/Reporter.java create mode 100644 java/tools/src/main/java/org/apache/qpid/tools/report/Statistics.java (limited to 'java/tools') diff --git a/java/tools/bin/controller b/java/tools/bin/controller deleted file mode 100644 index fab8614039..0000000000 --- a/java/tools/bin/controller +++ /dev/null @@ -1,132 +0,0 @@ -#!/bin/sh -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -# This starts the controller for coordinating perf tests/ - -. check-qpid-java-env - -PROGRAM_NAME=controller -CONSUMER_COUNT=1 -PRODUCER_COUNT=1 -DURATION=-1 -TEST_NAME="TEST_NAME" -EXTRA_JVM_ARGS="" - -TEMP=$(getopt -n $PROGRAM_NAME -o c:p:d:n:a:h --long consumers:,producers:,jvm-args:help -- "$@") - -usage() -{ - printf "\n%s\n" "Usage: controller [option].." - - printf "\n%31s\n%52s\n" "-c, --consumer-count=count" "No of consumers participating in the test" - - printf "\n%31s\n%52s\n" "-p, --producer-count=count" "No of producers participating in the test" - - printf "\n%24s\n%94s\n" "-d, --duration=mins" "The duration of the test in mins. If not specified, it will just run one iteration." - - printf "\n%27s\n%32s\n" "-n, --name=" "The name of the test." - - printf "\n%19s\n%50s\n" "-a, --jvm-args" "Extra jvm arguments you want to specify" -} - -eval set -- "$TEMP" -while true; do - case $1 in - -c|--consumer-count) - CONSUMER_COUNT="$2"; shift; shift; continue - ;; - -p|--producer-count) - PRODUCER_COUNT="$2"; shift; shift; continue - ;; - -d|--duration) - DURATION="$2"; shift; shift; continue - ;; - -n|--name) - TEST_NAME="$2"; shift; shift; continue - ;; - -h|--help) - usage - exit 0 - ;; - -a|--jvm-args) - EXTRA_JVM_ARGS="$2"; shift; shift; continue - ;; - --) - # no more arguments to parse - break - ;; - *) - # no more arguments to parse - break - ;; - esac -done - -CONTROLLER_ARGS="-server -Durl=amqp://guest:guest@clientid/testpath?brokerlist='tcp://localhost:5672' -Dprecision=mili -Dprod_count=$PRODUCER_COUNT -Dcons_count=$CONSUMER_COUNT -Dprint_std_dev=true -Dduration=${DURATION}" - - -waitfor() { until grep -a -l "$2" $1 >/dev/null 2>&1 ; do sleep 1 ; done ; } -cleanup() -{ - pids=`ps aux | grep java | grep PerfTestController | awk '{print $2}'` - if [ "$pids" != "" ]; then - kill -3 $pids - kill -9 $pids >/dev/null 2>&1 - fi -} - -run_controller() -{ - TEST_ARGS="$LOG_CONFIG $JAVA_MEM $CONTROLLER_ARGS $EXTRA_JVM_ARGS" - echo "Running controller with : $TEST_ARGS" > test.out - $JAVA -cp $CLASSPATH $TEST_ARGS org.apache.qpid.tools.PerfTestController >> test.out & - waitfor test.out "Controller: Completed the test" - sleep 2 #give a grace period to shutdown - print_result $TEST_NAME -} - -print_result() -{ - prod_rate=`cat test.out | grep "Avg Producer rate" | awk '{print $5}'` - sys_rate=`cat test.out | grep "System Throughput" | awk '{print $4}'` - cons_rate=`cat test.out | grep "Avg Consumer rate" | awk '{print $5}'` - avg_latency=`cat test.out | grep "Avg System Latency" | awk '{print $5}'` - min_latency=`cat test.out | grep "Min System Latency" | awk '{print $5}'` - max_latency=`cat test.out | grep "Max System Latency" | awk '{print $5}'` - std_dev=`cat test.out | grep "Avg System Std Dev" | awk '{print $6}'` - - printf "|%-15s|%15.2f|%13.2f|%13.2f|%11.2f|%11.2f|%11.2f|%7.2f|\n" $1 $sys_rate $prod_rate $cons_rate $avg_latency $min_latency $max_latency $std_dev - echo "--------------------------------------------------------------------------------------------------------" -} - -trap cleanup EXIT - -rm -rf *.out - -if [ "$DURATION" = -1 ]; then - echo "Test report on " `date +%F` - echo "========================================================================================================" - echo "|Test |System throuput|Producer rate|Consumer Rate|Avg Latency|Min Latency|Max Latency|Std Dev|" - echo "--------------------------------------------------------------------------------------------------------" -else - echo "Test in progress....Tail stats-csv.log to see results being printed for each iteration." -fi - -run_controller diff --git a/java/tools/bin/jms-quick-perf-report b/java/tools/bin/jms-quick-perf-report new file mode 100755 index 0000000000..7de3f2b602 --- /dev/null +++ b/java/tools/bin/jms-quick-perf-report @@ -0,0 +1,137 @@ +#!/bin/sh +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# This will run the following test cases defined below and produce +# a report in tabular format. + +QUEUE="queue;{create:always,node:{x-declare:{auto-delete:true}}}" +DURA_QUEUE="dqueue;{create:always,node:{durable:true,x-declare:{auto-delete:true}}}" +TOPIC="amq.topic/test" +DURA_TOPIC="amq.topic/test;{create:always,link:{durable:true}}" + +COMMON_CONFIG="-server -Durl=amqp://guest:guest@clientid/testpath?brokerlist='tcp://localhost:5672'" + +waitfor() { until grep -a -l "$2" $1 >/dev/null 2>&1 ; do sleep 1 ; done ; } +cleanup() +{ + pids=`ps aux | grep java | grep Perf | awk '{print $2}'` + if [ "$pids" != "" ]; then + kill -3 $pids + kill -9 $pids >/dev/null 2>&1 + fi +} + +# $1 test name +# $2 consumer options +# $3 producer options +run_testcase() +{ + sh run-sub $COMMON_CONFIG $2 > sub.out & + sh run-pub $COMMON_CONFIG $3 > pub.out & + waitfor pub.out "Controller: Completed the test" + sleep 2 #give a grace period to shutdown + print_result $1 + mv pub.out $1.pub.out + mv sub.out $1.sub.out +} + +print_result() +{ + prod_rate=`cat pub.out | grep "Avg Producer rate" | awk '{print $5}'` + sys_rate=`cat pub.out | grep "System Throughput" | awk '{print $4}'` + cons_rate=`cat pub.out | grep "Avg Consumer rate" | awk '{print $5}'` + avg_latency=`cat pub.out | grep "Avg System Latency" | awk '{print $5}'` + min_latency=`cat pub.out | grep "Min System Latency" | awk '{print $5}'` + max_latency=`cat pub.out | grep "Max System Latency" | awk '{print $5}'` + + printf "|%-15s|%15.2f|%13.2f|%13.2f|%11.2f|%11.2f|%11.2f|\n" $1 $sys_rate $prod_rate $cons_rate $avg_latency $min_latency $max_latency + echo "------------------------------------------------------------------------------------------------" +} + +trap cleanup EXIT +rm -rf *.out #cleanup old files. + +echo "Test report on " `date +%F` +echo "================================================================================================" +echo "|Test |System throuput|Producer rate|Consumer Rate|Avg Latency|Min Latency|Max Latency|" +echo "------------------------------------------------------------------------------------------------" + +# The message counts and warmup counts are set to very low values for quick testing of the script. +# For a real performance run I recommend setting warmup count to 10k and message count in excess of 100k +# However for transactions, sync_publish and especially small durable transactions (which is quite slow) I recommend +# setting very low values to start with and experiment while increasing them slowly. + +# Test 1 Trans Queue +run_testcase "Trans_Queue" "-Daddress=$QUEUE" "-Daddress=$QUEUE -Dwarmup_count=1 -Dmsg_count=10" + +# Test 2 Dura Queue +run_testcase "Dura_Queue" "-Daddress=$DURA_QUEUE -Ddurable=true" "-Daddress=$DURA_QUEUE -Ddurable=true -Dwarmup_count=1 -Dmsg_count=10" + +# Test 3 Dura Queue Sync +run_testcase "Dura_Queue_Sync" "-Daddress=$DURA_QUEUE -Ddurable=true" "-Daddress=$DURA_QUEUE -Ddurable=true -Dwarmup_count=1 -Dmsg_count=10 -Dsync_publish=persistent" + +# Test 4 Dura Queue Sync Publish and Ack +run_testcase "Dura_SyncPubAck" "-Daddress=$DURA_QUEUE -Ddurable=true -Dsync_ack=true" "-Daddress=$DURA_QUEUE -Ddurable=true -Dwarmup_count=1 -Dmsg_count=10 -Dsync_publish=persistent" + +# Test 5 Topic +run_testcase "Topic" "-Daddress=$TOPIC" "-Daddress=$TOPIC -Dwarmup_count=1 -Dmsg_count=10" + +# Test 6 Durable Topic +run_testcase "Dura_Topic" "-Daddress=$DURA_TOPIC -Ddurable=true" "-Daddress=$DURA_TOPIC -Ddurable=true -Dwarmup_count=1 -Dmsg_count=10" + +# Test 7 Fanout +run_testcase "Fanout" "-Daddress=amq.fanout" "-Daddress=amq.fanout -Dwarmup_count=1 -Dmsg_count=10" + +# Test 8 Small TX +run_testcase "Small_Txs_2" "-Daddress=$DURA_QUEUE -Ddurable=true -Dtransacted=true -Dtrans_size=1" \ + "-Daddress=$DURA_QUEUE -Ddurable=true -Dwarmup_count=1 -Dmsg_count=10 -Dtransacted=true -Dtrans_size=1" + +# Test 9 Large TX +run_testcase "Large_Txs_1000" "-Daddress=$DURA_QUEUE -Ddurable=true -Dtransacted=true -Dtrans_size=10" \ + "-Daddress=$DURA_QUEUE -Ddurable=true -Dwarmup_count=1 -Dmsg_count=10 -Dtransacted=true -Dtrans_size=10" + +# Test 10 256 MSG +run_testcase "Msg_256b" "-Daddress=$QUEUE" "-Daddress=$QUEUE -Dmsg_size=256 -Dwarmup_count=1 -Dmsg_count=10" + +# Test 11 512 MSG +run_testcase "Msg_512b" "-Daddress=$QUEUE" "-Daddress=$QUEUE -Dmsg_size=512 -Dwarmup_count=1 -Dmsg_count=10" + +# Test 12 2048 MSG +run_testcase "Msg_2048b" "-Daddress=$QUEUE" "-Daddress=$QUEUE -Dmsg_size=2048 -Dwarmup_count=1 -Dmsg_count=10" + +# Test 13 Random size MSG +run_testcase "Random_Msg_Size" "-Daddress=$QUEUE" "-Daddress=$QUEUE -Drandom_msg_size=true -Dwarmup_count=1 -Dmsg_count=10" + +# Test 14 Random size MSG Durable +run_testcase "Rand_Msg_Dura" "-Daddress=$DURA_QUEUE -Ddurable=true" "-Daddress=$DURA_QUEUE -Ddurable=true -Drandom_msg_size=true -Dwarmup_count=1 -Dmsg_count=10" + +# Test 15 64K MSG +run_testcase "Msg_64K" "-Daddress=$QUEUE -Damqj.tcpNoDelay=true" "-Daddress=$QUEUE -Damqj.tcpNoDelay=true -Dmsg_size=64000 -Dwarmup_count=1 -Dmsg_count=10" + +# Test 16 Durable 64K MSG +run_testcase "Msg_Durable_64K" "-Daddress=$DURA_QUEUE -Ddurable=true -Damqj.tcpNoDelay=true" \ + "-Daddress=$DURA_QUEUE -Damqj.tcpNoDelay=true -Dmsg_size=64000 -Ddurable=true -Dwarmup_count=1 -Dmsg_count=10" + +# Test 17 500K MSG +run_testcase "Msg_500K" "-Daddress=$QUEUE -Damqj.tcpNoDelay=true" "-Daddress=$QUEUE -Damqj.tcpNoDelay=true -Dmsg_size=500000 -Dwarmup_count=1 -Dmsg_count=10" + +# Test 18 Durable 500K MSG +run_testcase "Msg_Dura_500K" "-Daddress=$DURA_QUEUE -Damqj.tcpNoDelay=true -Ddurable=true" \ + "-Daddress=$DURA_QUEUE -Damqj.tcpNoDelay=true -Dmsg_size=500000 -Ddurable=true -Dwarmup_count=1 -Dmsg_count=10" diff --git a/java/tools/bin/mercury-controller b/java/tools/bin/mercury-controller new file mode 100644 index 0000000000..fab8614039 --- /dev/null +++ b/java/tools/bin/mercury-controller @@ -0,0 +1,132 @@ +#!/bin/sh +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# This starts the controller for coordinating perf tests/ + +. check-qpid-java-env + +PROGRAM_NAME=controller +CONSUMER_COUNT=1 +PRODUCER_COUNT=1 +DURATION=-1 +TEST_NAME="TEST_NAME" +EXTRA_JVM_ARGS="" + +TEMP=$(getopt -n $PROGRAM_NAME -o c:p:d:n:a:h --long consumers:,producers:,jvm-args:help -- "$@") + +usage() +{ + printf "\n%s\n" "Usage: controller [option].." + + printf "\n%31s\n%52s\n" "-c, --consumer-count=count" "No of consumers participating in the test" + + printf "\n%31s\n%52s\n" "-p, --producer-count=count" "No of producers participating in the test" + + printf "\n%24s\n%94s\n" "-d, --duration=mins" "The duration of the test in mins. If not specified, it will just run one iteration." + + printf "\n%27s\n%32s\n" "-n, --name=" "The name of the test." + + printf "\n%19s\n%50s\n" "-a, --jvm-args" "Extra jvm arguments you want to specify" +} + +eval set -- "$TEMP" +while true; do + case $1 in + -c|--consumer-count) + CONSUMER_COUNT="$2"; shift; shift; continue + ;; + -p|--producer-count) + PRODUCER_COUNT="$2"; shift; shift; continue + ;; + -d|--duration) + DURATION="$2"; shift; shift; continue + ;; + -n|--name) + TEST_NAME="$2"; shift; shift; continue + ;; + -h|--help) + usage + exit 0 + ;; + -a|--jvm-args) + EXTRA_JVM_ARGS="$2"; shift; shift; continue + ;; + --) + # no more arguments to parse + break + ;; + *) + # no more arguments to parse + break + ;; + esac +done + +CONTROLLER_ARGS="-server -Durl=amqp://guest:guest@clientid/testpath?brokerlist='tcp://localhost:5672' -Dprecision=mili -Dprod_count=$PRODUCER_COUNT -Dcons_count=$CONSUMER_COUNT -Dprint_std_dev=true -Dduration=${DURATION}" + + +waitfor() { until grep -a -l "$2" $1 >/dev/null 2>&1 ; do sleep 1 ; done ; } +cleanup() +{ + pids=`ps aux | grep java | grep PerfTestController | awk '{print $2}'` + if [ "$pids" != "" ]; then + kill -3 $pids + kill -9 $pids >/dev/null 2>&1 + fi +} + +run_controller() +{ + TEST_ARGS="$LOG_CONFIG $JAVA_MEM $CONTROLLER_ARGS $EXTRA_JVM_ARGS" + echo "Running controller with : $TEST_ARGS" > test.out + $JAVA -cp $CLASSPATH $TEST_ARGS org.apache.qpid.tools.PerfTestController >> test.out & + waitfor test.out "Controller: Completed the test" + sleep 2 #give a grace period to shutdown + print_result $TEST_NAME +} + +print_result() +{ + prod_rate=`cat test.out | grep "Avg Producer rate" | awk '{print $5}'` + sys_rate=`cat test.out | grep "System Throughput" | awk '{print $4}'` + cons_rate=`cat test.out | grep "Avg Consumer rate" | awk '{print $5}'` + avg_latency=`cat test.out | grep "Avg System Latency" | awk '{print $5}'` + min_latency=`cat test.out | grep "Min System Latency" | awk '{print $5}'` + max_latency=`cat test.out | grep "Max System Latency" | awk '{print $5}'` + std_dev=`cat test.out | grep "Avg System Std Dev" | awk '{print $6}'` + + printf "|%-15s|%15.2f|%13.2f|%13.2f|%11.2f|%11.2f|%11.2f|%7.2f|\n" $1 $sys_rate $prod_rate $cons_rate $avg_latency $min_latency $max_latency $std_dev + echo "--------------------------------------------------------------------------------------------------------" +} + +trap cleanup EXIT + +rm -rf *.out + +if [ "$DURATION" = -1 ]; then + echo "Test report on " `date +%F` + echo "========================================================================================================" + echo "|Test |System throuput|Producer rate|Consumer Rate|Avg Latency|Min Latency|Max Latency|Std Dev|" + echo "--------------------------------------------------------------------------------------------------------" +else + echo "Test in progress....Tail stats-csv.log to see results being printed for each iteration." +fi + +run_controller diff --git a/java/tools/bin/mercury-start-consumers b/java/tools/bin/mercury-start-consumers new file mode 100644 index 0000000000..c71fc0c21f --- /dev/null +++ b/java/tools/bin/mercury-start-consumers @@ -0,0 +1,119 @@ +#!/bin/sh +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# This starts the controller for coordinating perf tests/ + +. check-qpid-java-env + +PROGRAM_NAME="start-consumers" +PROCESS_COUNT=1 +CON_COUNT=1 +MSG_COUNT=10000 +ADDRESS="queue;{create:always}" +UNIQUE_DEST="false" + +EXTRA_JVM_ARGS=" -Dmax_prefetch=500 " + +TEST_ID=`echo ${HOSTNAME} | awk -F . '{print $1}'` + +TEMP=$(getopt -n $PROGRAM_NAME -o C:P:uc:p:a:s:t:w:h\ + --long connection-count:,process-count:,create-unique-queues-topics,\ +jvm-args:,queue:,topic:,address:,\ +msg-count:,help -- "$@") + +usage() +{ + printf "\n%s\n" "Usage: start-producers [option].." + + printf "\n%32s\n%51s\n" "-C, --connection-count=count" "No of consumers participating in the test" + + printf "\n%29s\n%51s\n" "-P, --process-count=count" "No of producers participating in the test" + + printf "\n%37s\n%105s\n" "-u, --create-unique-queues-topics" "This will create unique queue names and topics based on what you specify for --queue or --topic" + + printf "\n%11s\n%55s\n" "--queue" "The Queue you want to publish to. Ex my-queue" + + printf "\n%11s\n%84s\n" "--topic" "The Topic you want to publish to in amq.topic exchange. Ex amq.topic/topic" + + printf "\n%13s\n%44s\n" "--address" "The address you want to publish to" + + printf "\n%25s\n%50s\n" "-c, --msg-count=count" "message count per test (default 500,000)" + + printf "\n%18s\n%49s\n" "-a, --jvm-args" "Extra jvm arguments you want to specify" +} + +eval set -- "$TEMP" +while true; do + case $1 in + -C|--connection-count) + CON_COUNT="$2"; shift; shift; continue + ;; + -P|--process-count) + PROCESS_COUNT="$2"; shift; shift; continue + ;; + -u|--create-unique-queues-topics) + UNIQUE_DEST="true"; shift; continue + ;; + --queue) + ADDRESS="$2;{create: always}"; shift; shift; continue + ;; + --topic) + ADDRESS="amq.topic/$2"; shift; shift; continue + ;; + --address) + ADDRESS="$2"; shift; shift; continue + ;; + -h|--help) + usage + exit 0 + ;; + -a|--jvm-args) + EXTRA_JVM_ARGS="$2"; shift; shift; continue + ;; + -c|--msg-count) + MSG_COUNT="$2"; shift; shift; continue + ;; + --) + # no more arguments to parse + break + ;; + *) + # no more arguments to parse + break + ;; + esac +done + +CONSUMER_ARGS="-server -Durl=amqp://guest:guest@clientid/testpath?brokerlist='tcp://localhost:5672' -Dprecision=mili -Dcon_count=$CON_COUNT -Dprint_std_dev=true" + +start_consumers() +{ + for ((i=0; i<$PROCESS_COUNT; i++)) + do + if [ "$UNIQUE_DEST" = "true" ]; then + sh run-sub "$CONSUMER_ARGS $@" "${TEST_ID}_$i" > ${TEST_ID}_$i.sub.out 2>&1 & + else + sh run-sub "$CONSUMER_ARGS $@" > ${TEST_ID}_$i.sub.out 2>&1 & + fi + done +} + +start_consumers "-Daddress=$ADDRESS -Duse_unique_dest=$UNIQUE_DEST -Dmsg_count=$MSG_COUNT -Dcon_count=$CON_COUNT $EXTRA_JVM_ARGS" + diff --git a/java/tools/bin/mercury-start-producers b/java/tools/bin/mercury-start-producers new file mode 100644 index 0000000000..7ba0286f7c --- /dev/null +++ b/java/tools/bin/mercury-start-producers @@ -0,0 +1,136 @@ +#!/bin/sh +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# This starts the controller for coordinating perf tests/ + +. check-qpid-java-env + +PROGRAM_NAME="start-producers" +PROCESS_COUNT=1 +CON_COUNT=1 +MSG_TYPE="bytes" +WARMUP_MSG_COUNT=1000 +MSG_COUNT=10000 +MSG_SIZE=1024 +ADDRESS="queue;{create:always}" +UNIQUE_DEST="false" + +EXTRA_JVM_ARGS="" +TEST_ID=`echo ${HOSTNAME} | awk -F . '{print $1}'` + +TEMP=$(getopt -n $PROGRAM_NAME -o C:P:uc:p:a:s:t:w:h\ + --long connection-count:,process-count:,create-unique-queues-topics,\ +jvm-args:,queue:,topic:,address:,\ +msg-count:,msg-size:msg-type:,warmup-msg-count,help -- "$@") + +usage() +{ + printf "\n%s\n" "Usage: start-producers [option].." + + printf "\n%32s\n%51s\n" "-C, --connection-count=count" "No of consumers participating in the test" + + printf "\n%29s\n%51s\n" "-P, --process-count=count" "No of producers participating in the test" + + printf "\n%37s\n%105s\n" "-u, --create-unique-queues-topics" "This will create unique queue names and topics based on what you specify for --queue or --topic" + + printf "\n%11s\n%55s\n" "--queue" "The Queue you want to publish to. Ex my-queue" + + printf "\n%11s\n%84s\n" "--topic" "The Topic you want to publish to in amq.topic exchange. Ex amq.topic/topic" + + printf "\n%13s\n%44s\n" "--address" "The address you want to publish to" + + printf "\n%23s\n%37s\n" "-s, --msg-size=size" "message size (default 1024)" + + printf "\n%25s\n%50s\n" "-c, --msg-count=count" "message count per test (default 500,000)" + + printf "\n%18s\n%38s\n" "-t, --msg-type" "{bytes|text} (default bytes)" + + printf "\n%26s\n%49s\n" "-w, --warmup-msg-count" "warm up message count (default 100,000)" + + printf "\n%18s\n%49s\n" "-a, --jvm-args" "Extra jvm arguments you want to specify" +} + +eval set -- "$TEMP" +while true; do + case $1 in + -C|--connection-count) + CON_COUNT="$2"; shift; shift; continue + ;; + -P|--process-count) + PROCESS_COUNT="$2"; shift; shift; continue + ;; + -u|--create-unique-queues-topics) + UNIQUE_DEST="true"; shift; continue + ;; + --queue) + ADDRESS="$2;{create: always}"; shift; shift; continue + ;; + --topic) + ADDRESS="amq.topic/$2"; shift; shift; continue + ;; + --address) + ADDRESS="$2"; shift; shift; continue + ;; + -h|--help) + usage + exit 0 + ;; + -a|--jvm-args) + EXTRA_JVM_ARGS="$2"; shift; shift; continue + ;; + -s|--msg-size) + MSG_SIZE="$2"; shift; shift; continue + ;; + -c|--msg-count) + MSG_COUNT="$2"; shift; shift; continue + ;; + -t|--msg_type) + MSG_TYPE="$2"; shift; shift; continue + ;; + -w|--warmup-msg-count) + WARMUP_MSG_COUNT="$2"; shift; shift; continue + ;; + --) + # no more arguments to parse + break + ;; + *) + # no more arguments to parse + break + ;; + esac +done + +PRODUCER_ARGS="-server -Durl=amqp://guest:guest@clientid/testpath?brokerlist='tcp://localhost:5672' -Dext_controller=true -Dprecision=mili -Dcon_count=$CON_COUNT" + +start_producers() +{ + for ((i=0; i<$PROCESS_COUNT; i++)) + do + if [ "$UNIQUE_DEST" = "true" ]; then + sh run-pub "$PRODUCER_ARGS $@" "${TEST_ID}_$i" > ${TEST_ID}_$i.pub.out 2>&1 & + else + sh run-pub "$PRODUCER_ARGS $@" > ${TEST_ID}_$i.pub.out 2>&1 & + fi + done +} + +start_producers "-Daddress=$ADDRESS -Duse_unique_dest=$UNIQUE_DEST -Dmsg_count=$MSG_COUNT -Dmsg_size=$MSG_SIZE -Dwarmup_count=$WARMUP_MSG_COUNT -Dmsg_type=$MSG_TYPE -Dcon_count=$CON_COUNT $EXTRA_JVM_ARGS" + diff --git a/java/tools/bin/perf-report b/java/tools/bin/perf-report deleted file mode 100755 index 7de3f2b602..0000000000 --- a/java/tools/bin/perf-report +++ /dev/null @@ -1,137 +0,0 @@ -#!/bin/sh -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -# This will run the following test cases defined below and produce -# a report in tabular format. - -QUEUE="queue;{create:always,node:{x-declare:{auto-delete:true}}}" -DURA_QUEUE="dqueue;{create:always,node:{durable:true,x-declare:{auto-delete:true}}}" -TOPIC="amq.topic/test" -DURA_TOPIC="amq.topic/test;{create:always,link:{durable:true}}" - -COMMON_CONFIG="-server -Durl=amqp://guest:guest@clientid/testpath?brokerlist='tcp://localhost:5672'" - -waitfor() { until grep -a -l "$2" $1 >/dev/null 2>&1 ; do sleep 1 ; done ; } -cleanup() -{ - pids=`ps aux | grep java | grep Perf | awk '{print $2}'` - if [ "$pids" != "" ]; then - kill -3 $pids - kill -9 $pids >/dev/null 2>&1 - fi -} - -# $1 test name -# $2 consumer options -# $3 producer options -run_testcase() -{ - sh run-sub $COMMON_CONFIG $2 > sub.out & - sh run-pub $COMMON_CONFIG $3 > pub.out & - waitfor pub.out "Controller: Completed the test" - sleep 2 #give a grace period to shutdown - print_result $1 - mv pub.out $1.pub.out - mv sub.out $1.sub.out -} - -print_result() -{ - prod_rate=`cat pub.out | grep "Avg Producer rate" | awk '{print $5}'` - sys_rate=`cat pub.out | grep "System Throughput" | awk '{print $4}'` - cons_rate=`cat pub.out | grep "Avg Consumer rate" | awk '{print $5}'` - avg_latency=`cat pub.out | grep "Avg System Latency" | awk '{print $5}'` - min_latency=`cat pub.out | grep "Min System Latency" | awk '{print $5}'` - max_latency=`cat pub.out | grep "Max System Latency" | awk '{print $5}'` - - printf "|%-15s|%15.2f|%13.2f|%13.2f|%11.2f|%11.2f|%11.2f|\n" $1 $sys_rate $prod_rate $cons_rate $avg_latency $min_latency $max_latency - echo "------------------------------------------------------------------------------------------------" -} - -trap cleanup EXIT -rm -rf *.out #cleanup old files. - -echo "Test report on " `date +%F` -echo "================================================================================================" -echo "|Test |System throuput|Producer rate|Consumer Rate|Avg Latency|Min Latency|Max Latency|" -echo "------------------------------------------------------------------------------------------------" - -# The message counts and warmup counts are set to very low values for quick testing of the script. -# For a real performance run I recommend setting warmup count to 10k and message count in excess of 100k -# However for transactions, sync_publish and especially small durable transactions (which is quite slow) I recommend -# setting very low values to start with and experiment while increasing them slowly. - -# Test 1 Trans Queue -run_testcase "Trans_Queue" "-Daddress=$QUEUE" "-Daddress=$QUEUE -Dwarmup_count=1 -Dmsg_count=10" - -# Test 2 Dura Queue -run_testcase "Dura_Queue" "-Daddress=$DURA_QUEUE -Ddurable=true" "-Daddress=$DURA_QUEUE -Ddurable=true -Dwarmup_count=1 -Dmsg_count=10" - -# Test 3 Dura Queue Sync -run_testcase "Dura_Queue_Sync" "-Daddress=$DURA_QUEUE -Ddurable=true" "-Daddress=$DURA_QUEUE -Ddurable=true -Dwarmup_count=1 -Dmsg_count=10 -Dsync_publish=persistent" - -# Test 4 Dura Queue Sync Publish and Ack -run_testcase "Dura_SyncPubAck" "-Daddress=$DURA_QUEUE -Ddurable=true -Dsync_ack=true" "-Daddress=$DURA_QUEUE -Ddurable=true -Dwarmup_count=1 -Dmsg_count=10 -Dsync_publish=persistent" - -# Test 5 Topic -run_testcase "Topic" "-Daddress=$TOPIC" "-Daddress=$TOPIC -Dwarmup_count=1 -Dmsg_count=10" - -# Test 6 Durable Topic -run_testcase "Dura_Topic" "-Daddress=$DURA_TOPIC -Ddurable=true" "-Daddress=$DURA_TOPIC -Ddurable=true -Dwarmup_count=1 -Dmsg_count=10" - -# Test 7 Fanout -run_testcase "Fanout" "-Daddress=amq.fanout" "-Daddress=amq.fanout -Dwarmup_count=1 -Dmsg_count=10" - -# Test 8 Small TX -run_testcase "Small_Txs_2" "-Daddress=$DURA_QUEUE -Ddurable=true -Dtransacted=true -Dtrans_size=1" \ - "-Daddress=$DURA_QUEUE -Ddurable=true -Dwarmup_count=1 -Dmsg_count=10 -Dtransacted=true -Dtrans_size=1" - -# Test 9 Large TX -run_testcase "Large_Txs_1000" "-Daddress=$DURA_QUEUE -Ddurable=true -Dtransacted=true -Dtrans_size=10" \ - "-Daddress=$DURA_QUEUE -Ddurable=true -Dwarmup_count=1 -Dmsg_count=10 -Dtransacted=true -Dtrans_size=10" - -# Test 10 256 MSG -run_testcase "Msg_256b" "-Daddress=$QUEUE" "-Daddress=$QUEUE -Dmsg_size=256 -Dwarmup_count=1 -Dmsg_count=10" - -# Test 11 512 MSG -run_testcase "Msg_512b" "-Daddress=$QUEUE" "-Daddress=$QUEUE -Dmsg_size=512 -Dwarmup_count=1 -Dmsg_count=10" - -# Test 12 2048 MSG -run_testcase "Msg_2048b" "-Daddress=$QUEUE" "-Daddress=$QUEUE -Dmsg_size=2048 -Dwarmup_count=1 -Dmsg_count=10" - -# Test 13 Random size MSG -run_testcase "Random_Msg_Size" "-Daddress=$QUEUE" "-Daddress=$QUEUE -Drandom_msg_size=true -Dwarmup_count=1 -Dmsg_count=10" - -# Test 14 Random size MSG Durable -run_testcase "Rand_Msg_Dura" "-Daddress=$DURA_QUEUE -Ddurable=true" "-Daddress=$DURA_QUEUE -Ddurable=true -Drandom_msg_size=true -Dwarmup_count=1 -Dmsg_count=10" - -# Test 15 64K MSG -run_testcase "Msg_64K" "-Daddress=$QUEUE -Damqj.tcpNoDelay=true" "-Daddress=$QUEUE -Damqj.tcpNoDelay=true -Dmsg_size=64000 -Dwarmup_count=1 -Dmsg_count=10" - -# Test 16 Durable 64K MSG -run_testcase "Msg_Durable_64K" "-Daddress=$DURA_QUEUE -Ddurable=true -Damqj.tcpNoDelay=true" \ - "-Daddress=$DURA_QUEUE -Damqj.tcpNoDelay=true -Dmsg_size=64000 -Ddurable=true -Dwarmup_count=1 -Dmsg_count=10" - -# Test 17 500K MSG -run_testcase "Msg_500K" "-Daddress=$QUEUE -Damqj.tcpNoDelay=true" "-Daddress=$QUEUE -Damqj.tcpNoDelay=true -Dmsg_size=500000 -Dwarmup_count=1 -Dmsg_count=10" - -# Test 18 Durable 500K MSG -run_testcase "Msg_Dura_500K" "-Daddress=$DURA_QUEUE -Damqj.tcpNoDelay=true -Ddurable=true" \ - "-Daddress=$DURA_QUEUE -Damqj.tcpNoDelay=true -Dmsg_size=500000 -Ddurable=true -Dwarmup_count=1 -Dmsg_count=10" diff --git a/java/tools/bin/start-consumers b/java/tools/bin/start-consumers deleted file mode 100644 index c71fc0c21f..0000000000 --- a/java/tools/bin/start-consumers +++ /dev/null @@ -1,119 +0,0 @@ -#!/bin/sh -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -# This starts the controller for coordinating perf tests/ - -. check-qpid-java-env - -PROGRAM_NAME="start-consumers" -PROCESS_COUNT=1 -CON_COUNT=1 -MSG_COUNT=10000 -ADDRESS="queue;{create:always}" -UNIQUE_DEST="false" - -EXTRA_JVM_ARGS=" -Dmax_prefetch=500 " - -TEST_ID=`echo ${HOSTNAME} | awk -F . '{print $1}'` - -TEMP=$(getopt -n $PROGRAM_NAME -o C:P:uc:p:a:s:t:w:h\ - --long connection-count:,process-count:,create-unique-queues-topics,\ -jvm-args:,queue:,topic:,address:,\ -msg-count:,help -- "$@") - -usage() -{ - printf "\n%s\n" "Usage: start-producers [option].." - - printf "\n%32s\n%51s\n" "-C, --connection-count=count" "No of consumers participating in the test" - - printf "\n%29s\n%51s\n" "-P, --process-count=count" "No of producers participating in the test" - - printf "\n%37s\n%105s\n" "-u, --create-unique-queues-topics" "This will create unique queue names and topics based on what you specify for --queue or --topic" - - printf "\n%11s\n%55s\n" "--queue" "The Queue you want to publish to. Ex my-queue" - - printf "\n%11s\n%84s\n" "--topic" "The Topic you want to publish to in amq.topic exchange. Ex amq.topic/topic" - - printf "\n%13s\n%44s\n" "--address" "The address you want to publish to" - - printf "\n%25s\n%50s\n" "-c, --msg-count=count" "message count per test (default 500,000)" - - printf "\n%18s\n%49s\n" "-a, --jvm-args" "Extra jvm arguments you want to specify" -} - -eval set -- "$TEMP" -while true; do - case $1 in - -C|--connection-count) - CON_COUNT="$2"; shift; shift; continue - ;; - -P|--process-count) - PROCESS_COUNT="$2"; shift; shift; continue - ;; - -u|--create-unique-queues-topics) - UNIQUE_DEST="true"; shift; continue - ;; - --queue) - ADDRESS="$2;{create: always}"; shift; shift; continue - ;; - --topic) - ADDRESS="amq.topic/$2"; shift; shift; continue - ;; - --address) - ADDRESS="$2"; shift; shift; continue - ;; - -h|--help) - usage - exit 0 - ;; - -a|--jvm-args) - EXTRA_JVM_ARGS="$2"; shift; shift; continue - ;; - -c|--msg-count) - MSG_COUNT="$2"; shift; shift; continue - ;; - --) - # no more arguments to parse - break - ;; - *) - # no more arguments to parse - break - ;; - esac -done - -CONSUMER_ARGS="-server -Durl=amqp://guest:guest@clientid/testpath?brokerlist='tcp://localhost:5672' -Dprecision=mili -Dcon_count=$CON_COUNT -Dprint_std_dev=true" - -start_consumers() -{ - for ((i=0; i<$PROCESS_COUNT; i++)) - do - if [ "$UNIQUE_DEST" = "true" ]; then - sh run-sub "$CONSUMER_ARGS $@" "${TEST_ID}_$i" > ${TEST_ID}_$i.sub.out 2>&1 & - else - sh run-sub "$CONSUMER_ARGS $@" > ${TEST_ID}_$i.sub.out 2>&1 & - fi - done -} - -start_consumers "-Daddress=$ADDRESS -Duse_unique_dest=$UNIQUE_DEST -Dmsg_count=$MSG_COUNT -Dcon_count=$CON_COUNT $EXTRA_JVM_ARGS" - diff --git a/java/tools/bin/start-producers b/java/tools/bin/start-producers deleted file mode 100644 index 7ba0286f7c..0000000000 --- a/java/tools/bin/start-producers +++ /dev/null @@ -1,136 +0,0 @@ -#!/bin/sh -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -# This starts the controller for coordinating perf tests/ - -. check-qpid-java-env - -PROGRAM_NAME="start-producers" -PROCESS_COUNT=1 -CON_COUNT=1 -MSG_TYPE="bytes" -WARMUP_MSG_COUNT=1000 -MSG_COUNT=10000 -MSG_SIZE=1024 -ADDRESS="queue;{create:always}" -UNIQUE_DEST="false" - -EXTRA_JVM_ARGS="" -TEST_ID=`echo ${HOSTNAME} | awk -F . '{print $1}'` - -TEMP=$(getopt -n $PROGRAM_NAME -o C:P:uc:p:a:s:t:w:h\ - --long connection-count:,process-count:,create-unique-queues-topics,\ -jvm-args:,queue:,topic:,address:,\ -msg-count:,msg-size:msg-type:,warmup-msg-count,help -- "$@") - -usage() -{ - printf "\n%s\n" "Usage: start-producers [option].." - - printf "\n%32s\n%51s\n" "-C, --connection-count=count" "No of consumers participating in the test" - - printf "\n%29s\n%51s\n" "-P, --process-count=count" "No of producers participating in the test" - - printf "\n%37s\n%105s\n" "-u, --create-unique-queues-topics" "This will create unique queue names and topics based on what you specify for --queue or --topic" - - printf "\n%11s\n%55s\n" "--queue" "The Queue you want to publish to. Ex my-queue" - - printf "\n%11s\n%84s\n" "--topic" "The Topic you want to publish to in amq.topic exchange. Ex amq.topic/topic" - - printf "\n%13s\n%44s\n" "--address" "The address you want to publish to" - - printf "\n%23s\n%37s\n" "-s, --msg-size=size" "message size (default 1024)" - - printf "\n%25s\n%50s\n" "-c, --msg-count=count" "message count per test (default 500,000)" - - printf "\n%18s\n%38s\n" "-t, --msg-type" "{bytes|text} (default bytes)" - - printf "\n%26s\n%49s\n" "-w, --warmup-msg-count" "warm up message count (default 100,000)" - - printf "\n%18s\n%49s\n" "-a, --jvm-args" "Extra jvm arguments you want to specify" -} - -eval set -- "$TEMP" -while true; do - case $1 in - -C|--connection-count) - CON_COUNT="$2"; shift; shift; continue - ;; - -P|--process-count) - PROCESS_COUNT="$2"; shift; shift; continue - ;; - -u|--create-unique-queues-topics) - UNIQUE_DEST="true"; shift; continue - ;; - --queue) - ADDRESS="$2;{create: always}"; shift; shift; continue - ;; - --topic) - ADDRESS="amq.topic/$2"; shift; shift; continue - ;; - --address) - ADDRESS="$2"; shift; shift; continue - ;; - -h|--help) - usage - exit 0 - ;; - -a|--jvm-args) - EXTRA_JVM_ARGS="$2"; shift; shift; continue - ;; - -s|--msg-size) - MSG_SIZE="$2"; shift; shift; continue - ;; - -c|--msg-count) - MSG_COUNT="$2"; shift; shift; continue - ;; - -t|--msg_type) - MSG_TYPE="$2"; shift; shift; continue - ;; - -w|--warmup-msg-count) - WARMUP_MSG_COUNT="$2"; shift; shift; continue - ;; - --) - # no more arguments to parse - break - ;; - *) - # no more arguments to parse - break - ;; - esac -done - -PRODUCER_ARGS="-server -Durl=amqp://guest:guest@clientid/testpath?brokerlist='tcp://localhost:5672' -Dext_controller=true -Dprecision=mili -Dcon_count=$CON_COUNT" - -start_producers() -{ - for ((i=0; i<$PROCESS_COUNT; i++)) - do - if [ "$UNIQUE_DEST" = "true" ]; then - sh run-pub "$PRODUCER_ARGS $@" "${TEST_ID}_$i" > ${TEST_ID}_$i.pub.out 2>&1 & - else - sh run-pub "$PRODUCER_ARGS $@" > ${TEST_ID}_$i.pub.out 2>&1 & - fi - done -} - -start_producers "-Daddress=$ADDRESS -Duse_unique_dest=$UNIQUE_DEST -Dmsg_count=$MSG_COUNT -Dmsg_size=$MSG_SIZE -Dwarmup_count=$WARMUP_MSG_COUNT -Dmsg_type=$MSG_TYPE -Dcon_count=$CON_COUNT $EXTRA_JVM_ARGS" - diff --git a/java/tools/src/main/java/org/apache/qpid/tools/Clock.java b/java/tools/src/main/java/org/apache/qpid/tools/Clock.java index 979d2ef76f..4e79dd62a8 100644 --- a/java/tools/src/main/java/org/apache/qpid/tools/Clock.java +++ b/java/tools/src/main/java/org/apache/qpid/tools/Clock.java @@ -27,6 +27,8 @@ package org.apache.qpid.tools; public class Clock { + public final static long SEC = 60000; + private static Precision precision; private static long offset = -1; // in nano secs diff --git a/java/tools/src/main/java/org/apache/qpid/tools/JVMArgConfiguration.java b/java/tools/src/main/java/org/apache/qpid/tools/JVMArgConfiguration.java new file mode 100644 index 0000000000..c6abdf6c84 --- /dev/null +++ b/java/tools/src/main/java/org/apache/qpid/tools/JVMArgConfiguration.java @@ -0,0 +1,411 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.tools; + +import java.text.DecimalFormat; + +import javax.jms.Connection; +import javax.jms.Session; + +import org.apache.qpid.client.AMQConnection; + +public class JVMArgConfiguration implements TestConfiguration +{ + /* + * By default the connection URL is used. + * This allows a user to easily specify a fully fledged URL any given property. + * Ex. SSL parameters + * + * By providing a host & port allows a user to simply override the URL. + * This allows to create multiple clients in test scripts easily, + * without having to deal with the long URL format. + */ + private String url = "amqp://guest:guest@clientid/testpath?brokerlist='tcp://localhost:5672'"; + + private String host = ""; + + private int port = -1; + + private String address = "queue; {create : always}"; + + private int msg_size = 1024; + + private int random_msg_size_start_from = 1; + + private boolean cacheMessage = false; + + private boolean disableMessageID = false; + + private boolean disableTimestamp = false; + + private boolean durable = false; + + private int transaction_size = 0; + + private int ack_mode = Session.AUTO_ACKNOWLEDGE; + + private int msg_count = 10; + + private int warmup_count = 1; + + private boolean random_msg_size = false; + + private String msgType = "bytes"; + + private boolean printStdDev = false; + + private int sendRate = 0; + + private boolean externalController = false; + + private boolean useUniqueDest = false; // useful when using multiple connections. + + private int ackFrequency = 100; + + private DecimalFormat df = new DecimalFormat("###.##"); + + private int reportEvery = 0; + + private boolean isReportTotal = false; + + private boolean isReportHeader = true; + + private boolean isReportLatency = false; + + private int sendEOS = 0; + + private int connectionCount = 1; + + private int rollbackFrequency = 0; + + private boolean printHeaders; + + public JVMArgConfiguration() + { + + url = System.getProperty("url",url); + host = System.getProperty("host",""); + port = Integer.getInteger("port", -1); + address = System.getProperty("address",address); + + msg_size = Integer.getInteger("msg-size", 1024); + cacheMessage = Boolean.getBoolean("cache-msg"); + disableMessageID = Boolean.getBoolean("disable-message-id"); + disableTimestamp = Boolean.getBoolean("disable-timestamp"); + durable = Boolean.getBoolean("durable"); + transaction_size = Integer.getInteger("tx",1000); + ack_mode = Integer.getInteger("ack-mode",Session.AUTO_ACKNOWLEDGE); + msg_count = Integer.getInteger("msg-count",msg_count); + warmup_count = Integer.getInteger("warmup-count",warmup_count); + random_msg_size = Boolean.getBoolean("random-msg-size"); + msgType = System.getProperty("msg-type","bytes"); + printStdDev = Boolean.getBoolean("print-std-dev"); + sendRate = Integer.getInteger("rate",0); + externalController = Boolean.getBoolean("ext-controller"); + useUniqueDest = Boolean.getBoolean("use-unique-dest"); + random_msg_size_start_from = Integer.getInteger("random-msg-size-start-from", 1); + reportEvery = Integer.getInteger("report-every"); + isReportTotal = Boolean.getBoolean("report-total"); + isReportHeader = (System.getProperty("report-header") == null) ? true : Boolean.getBoolean("report-header"); + isReportLatency = Boolean.getBoolean("report-latency"); + sendEOS = Integer.getInteger("send-eos"); + connectionCount = Integer.getInteger("con_count",1); + ackFrequency = Integer.getInteger("ack-frequency"); + rollbackFrequency = Integer.getInteger("rollback-frequency"); + printHeaders = Boolean.getBoolean("print-headers"); + } + + /* (non-Javadoc) + * @see org.apache.qpid.tools.TestConfiguration#getUrl() + */ + @Override + public String getUrl() + { + return url; + } + + /* (non-Javadoc) + * @see org.apache.qpid.tools.TestConfiguration#getHost() + */ + @Override + public String getHost() + { + return host; + } + + /* (non-Javadoc) + * @see org.apache.qpid.tools.TestConfiguration#getPort() + */ + @Override + public int getPort() + { + return port; + } + + /* (non-Javadoc) + * @see org.apache.qpid.tools.TestConfiguration#getAddress() + */ + @Override + public String getAddress() + { + return address; + } + + /* (non-Javadoc) + * @see org.apache.qpid.tools.TestConfiguration#getAckMode() + */ + @Override + public int getAckMode() + { + return ack_mode; + } + + /* (non-Javadoc) + * @see org.apache.qpid.tools.TestConfiguration#getMsgCount() + */ + @Override + public int getMsgCount() + { + return msg_count; + } + + /* (non-Javadoc) + * @see org.apache.qpid.tools.TestConfiguration#getMsgSize() + */ + @Override + public int getMsgSize() + { + return msg_size; + } + + /* (non-Javadoc) + * @see org.apache.qpid.tools.TestConfiguration#getRandomMsgSizeStartFrom() + */ + @Override + public int getRandomMsgSizeStartFrom() + { + return random_msg_size_start_from; + } + + /* (non-Javadoc) + * @see org.apache.qpid.tools.TestConfiguration#isDurable() + */ + @Override + public boolean isDurable() + { + return durable; + } + + /* (non-Javadoc) + * @see org.apache.qpid.tools.TestConfiguration#isTransacted() + */ + @Override + public boolean isTransacted() + { + return transaction_size > 0; + } + + /* (non-Javadoc) + * @see org.apache.qpid.tools.TestConfiguration#getTransactionSize() + */ + @Override + public int getTransactionSize() + { + return transaction_size; + } + + /* (non-Javadoc) + * @see org.apache.qpid.tools.TestConfiguration#getWarmupCount() + */ + @Override + public int getWarmupCount() + { + return warmup_count; + } + + /* (non-Javadoc) + * @see org.apache.qpid.tools.TestConfiguration#isCacheMessage() + */ + @Override + public boolean isCacheMessage() + { + return cacheMessage; + } + + /* (non-Javadoc) + * @see org.apache.qpid.tools.TestConfiguration#isDisableMessageID() + */ + @Override + public boolean isDisableMessageID() + { + return disableMessageID; + } + + /* (non-Javadoc) + * @see org.apache.qpid.tools.TestConfiguration#isDisableTimestamp() + */ + @Override + public boolean isDisableTimestamp() + { + return disableTimestamp; + } + + /* (non-Javadoc) + * @see org.apache.qpid.tools.TestConfiguration#isRandomMsgSize() + */ + @Override + public boolean isRandomMsgSize() + { + return random_msg_size; + } + + /* (non-Javadoc) + * @see org.apache.qpid.tools.TestConfiguration#getMessageType() + */ + @Override + public String getMessageType() + { + return msgType; + } + + /* (non-Javadoc) + * @see org.apache.qpid.tools.TestConfiguration#isPrintStdDev() + */ + @Override + public boolean isPrintStdDev() + { + return printStdDev; + } + + /* (non-Javadoc) + * @see org.apache.qpid.tools.TestConfiguration#getSendRate() + */ + @Override + public int getSendRate() + { + return sendRate; + } + + /* (non-Javadoc) + * @see org.apache.qpid.tools.TestConfiguration#isExternalController() + */ + @Override + public boolean isExternalController() + { + return externalController; + } + + public void setAddress(String addr) + { + address = addr; + } + + /* (non-Javadoc) + * @see org.apache.qpid.tools.TestConfiguration#isUseUniqueDests() + */ + @Override + public boolean isUseUniqueDests() + { + return useUniqueDest; + } + + /* (non-Javadoc) + * @see org.apache.qpid.tools.TestConfiguration#getAckFrequency() + */ + @Override + public int getAckFrequency() + { + return ackFrequency; + } + + /* (non-Javadoc) + * @see org.apache.qpid.tools.TestConfiguration#createConnection() + */ + @Override + public Connection createConnection() throws Exception + { + if (getHost().equals("") || getPort() == -1) + { + return new AMQConnection(getUrl()); + } + else + { + return new AMQConnection(getHost(),getPort(),"guest","guest","test","test"); + } + } + + /* (non-Javadoc) + * @see org.apache.qpid.tools.TestConfiguration#getDecimalFormat() + */ + @Override + public DecimalFormat getDecimalFormat() + { + return df; + } + + @Override + public int reportEvery() + { + return reportEvery; + } + + @Override + public boolean isReportTotal() + { + return isReportTotal; + } + + @Override + public boolean isReportHeader() + { + return isReportHeader; + } + + @Override + public boolean isReportLatency() + { + return isReportLatency; + } + + @Override + public int getSendEOS() + { + return sendEOS; + } + + @Override + public int getConnectionCount() + { + return connectionCount; + } + + @Override + public int getRollbackFrequency() + { + return rollbackFrequency; + } + + @Override + public boolean isPrintHeaders() + { + return printHeaders; + } +} diff --git a/java/tools/src/main/java/org/apache/qpid/tools/LatencyTest.java b/java/tools/src/main/java/org/apache/qpid/tools/LatencyTest.java deleted file mode 100644 index 16149d17c9..0000000000 --- a/java/tools/src/main/java/org/apache/qpid/tools/LatencyTest.java +++ /dev/null @@ -1,349 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.tools; - -import java.io.FileOutputStream; -import java.io.PrintWriter; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; - -import javax.jms.BytesMessage; -import javax.jms.DeliveryMode; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import javax.jms.MessageProducer; -import javax.jms.TextMessage; - -import org.apache.qpid.thread.Threading; - -/** - * Latency test sends an x number of messages in warmup mode and wait for a confirmation - * from the consumer that it has successfully consumed them and ready to start the - * test. It will start sending y number of messages and each message will contain a time - * stamp. This will be used at the receiving end to measure the latency. - * - * It is important to have a sufficiently large number for the warmup count to - * ensure the system is in steady state before the test is started. - * - * If you plan to plot the latencies then msg_count should be a smaller number (ex 500 or 1000) - * You also need to specify a file name using -Dfile=/home/rajith/latency.log.1 - * - * The idea is to get a latency sample for the system once it achieves steady state. - * - */ - -public class LatencyTest extends PerfBase implements MessageListener -{ - private MessageProducer producer; - private MessageConsumer consumer; - private Message msg; - private byte[] payload; - private long maxLatency = 0; - private long minLatency = Long.MAX_VALUE; - private long totalLatency = 0; // to calculate avg latency. - private int rcvdMsgCount = 0; - private double stdDev = 0; - private double avgLatency = 0; - private boolean warmup_mode = true; - private boolean transacted = false; - private int transSize = 0; - - private final List latencies; - private final Lock lock = new ReentrantLock(); - private final Condition warmedUp; - private final Condition testCompleted; - - public LatencyTest() - { - super(""); - warmedUp = lock.newCondition(); - testCompleted = lock.newCondition(); - // Storing the following two for efficiency - transacted = params.isTransacted(); - transSize = params.getTransactionSize(); - latencies = new ArrayList (params.getMsgCount()); - } - - public void setUp() throws Exception - { - super.setUp(); - consumer = session.createConsumer(dest); - consumer.setMessageListener(this); - - // if message caching is enabled we pre create the message - // else we pre create the payload - if (params.isCacheMessage()) - { - msg = MessageFactory.createBytesMessage(session, params.getMsgSize()); - msg.setJMSDeliveryMode(params.isDurable()? - DeliveryMode.PERSISTENT : - DeliveryMode.NON_PERSISTENT - ); - } - else - { - payload = MessageFactory.createMessagePayload(params.getMsgSize()).getBytes(); - } - - producer = session.createProducer(dest); - producer.setDisableMessageID(params.isDisableMessageID()); - producer.setDisableMessageTimestamp(params.isDisableTimestamp()); - } - - protected Message getNextMessage() throws Exception - { - if (params.isCacheMessage()) - { - return msg; - } - else - { - msg = session.createBytesMessage(); - ((BytesMessage)msg).writeBytes(payload); - return msg; - } - } - - public void warmup()throws Exception - { - System.out.println("Warming up......"); - int count = params.getWarmupCount(); - for (int i=0; i < count; i++) - { - producer.send(getNextMessage()); - } - Message msg = session.createTextMessage("End"); - producer.send(msg); - - if (params.isTransacted()) - { - session.commit(); - } - - try - { - lock.lock(); - warmedUp.await(); - } - finally - { - lock.unlock(); - } - } - - public void onMessage(Message msg) - { - try - { - if (msg instanceof TextMessage && ((TextMessage)msg).getText().equals("End")) - { - if (warmup_mode) - { - warmup_mode = false; - try - { - lock.lock(); - warmedUp.signal(); - } - finally - { - lock.unlock(); - } - } - else - { - computeStats(); - } - } - else if (!warmup_mode) - { - long time = System.currentTimeMillis(); - rcvdMsgCount ++; - - if (transacted && (rcvdMsgCount % transSize == 0)) - { - session.commit(); - } - - long latency = time - msg.getJMSTimestamp(); - latencies.add(latency); - totalLatency = totalLatency + latency; - } - - } - catch(Exception e) - { - handleError(e,"Error when receiving messages"); - } - - } - - private void computeStats() - { - avgLatency = (double)totalLatency/(double)rcvdMsgCount; - double sigma = 0; - - for (long latency: latencies) - { - maxLatency = Math.max(maxLatency, latency); - minLatency = Math.min(minLatency, latency); - sigma = sigma + Math.pow(latency - avgLatency,2); - } - - stdDev = Math.sqrt(sigma/(rcvdMsgCount -1)); - - try - { - lock.lock(); - testCompleted.signal(); - } - finally - { - lock.unlock(); - } - } - - public void writeToFile() throws Exception - { - String fileName = System.getProperty("file"); - PrintWriter writer = new PrintWriter(new FileOutputStream(fileName)); - for (long latency: latencies) - { - writer.println(String.valueOf(latency)); - } - writer.flush(); - writer.close(); - } - - public void printToConsole() - { - System.out.println(new StringBuilder("Total Msgs Received : ").append(rcvdMsgCount).toString()); - System.out.println(new StringBuilder("Standard Deviation : "). - append(df.format(stdDev)). - append(" ms").toString()); - System.out.println(new StringBuilder("Avg Latency : "). - append(df.format(avgLatency)). - append(" ms").toString()); - System.out.println(new StringBuilder("Min Latency : "). - append(minLatency). - append(" ms").toString()); - System.out.println(new StringBuilder("Max Latency : "). - append(maxLatency). - append(" ms").toString()); - System.out.println("Completed the test......\n"); - } - - public void startTest() throws Exception - { - System.out.println("Starting test......"); - int count = params.getMsgCount(); - - for(int i=0; i < count; i++ ) - { - Message msg = getNextMessage(); - msg.setJMSTimestamp(System.currentTimeMillis()); - producer.send(msg); - if ( transacted && ((i+1) % transSize == 0)) - { - session.commit(); - } - } - Message msg = session.createTextMessage("End"); - producer.send(msg); - if (params.isTransacted()) - { - session.commit(); - } - } - - public void tearDown() throws Exception - { - try - { - lock.lock(); - testCompleted.await(); - } - finally - { - lock.unlock(); - } - - producer.close(); - consumer.close(); - session.close(); - con.close(); - } - - public void test() - { - try - { - setUp(); - warmup(); - startTest(); - tearDown(); - } - catch(Exception e) - { - handleError(e,"Error when running test"); - } - } - - - public static void main(String[] args) - { - final LatencyTest latencyTest = new LatencyTest(); - Runnable r = new Runnable() - { - public void run() - { - latencyTest.test(); - latencyTest.printToConsole(); - if (System.getProperty("file") != null) - { - try - { - latencyTest.writeToFile(); - } - catch(Exception e) - { - e.printStackTrace(); - } - } - } - }; - - Thread t; - try - { - t = Threading.getThreadFactory().createThread(r); - } - catch(Exception e) - { - throw new Error("Error creating latency test thread",e); - } - t.start(); - } -} diff --git a/java/tools/src/main/java/org/apache/qpid/tools/MercuryBase.java b/java/tools/src/main/java/org/apache/qpid/tools/MercuryBase.java new file mode 100644 index 0000000000..097b021b3e --- /dev/null +++ b/java/tools/src/main/java/org/apache/qpid/tools/MercuryBase.java @@ -0,0 +1,191 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.tools; + +import java.net.InetAddress; +import java.util.UUID; + +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.MapMessage; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; + +import org.apache.qpid.client.AMQAnyDestination; +import org.apache.qpid.client.AMQDestination; +import org.apache.qpid.client.AMQSession_0_10; +import org.apache.qpid.messaging.Address; +import org.apache.qpid.tools.TestConfiguration.MessageType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MercuryBase +{ + private static final Logger _logger = LoggerFactory.getLogger(MercuryBase.class); + + public final static String CODE = "CODE"; + public final static String ID = "ID"; + public final static String REPLY_ADDR = "REPLY_ADDR"; + public final static String MAX_LATENCY = "MAX_LATENCY"; + public final static String MIN_LATENCY = "MIN_LATENCY"; + public final static String AVG_LATENCY = "AVG_LATENCY"; + public final static String STD_DEV = "STD_DEV"; + public final static String CONS_RATE = "CONS_RATE"; + public final static String PROD_RATE = "PROD_RATE"; + public final static String MSG_COUNT = "MSG_COUNT"; + public final static String TIMESTAMP = "Timestamp"; + + String CONTROLLER_ADDR = System.getProperty("CONT_ADDR","CONTROLLER;{create: always, node:{x-declare:{auto-delete:true}}}"); + + TestConfiguration config; + Connection con; + Session session; + Session controllerSession; + Destination dest; + Destination myControlQueue; + Destination controllerQueue; + String id; + String myControlQueueAddr; + + MessageProducer sendToController; + MessageConsumer receiveFromController; + String prefix = ""; + + enum OPCode + { + REGISTER_CONSUMER, REGISTER_PRODUCER, + PRODUCER_STARTWARMUP, CONSUMER_STARTWARMUP, + CONSUMER_READY, PRODUCER_READY, + PRODUCER_START, + RECEIVED_END_MSG, CONSUMER_STOP, + RECEIVED_PRODUCER_STATS, RECEIVED_CONSUMER_STATS, + CONTINUE_TEST, STOP_TEST + }; + + MessageType msgType = MessageType.BYTES; + + public MercuryBase(TestConfiguration config,String prefix) + { + this.config = config; + String host = ""; + try + { + host = InetAddress.getLocalHost().getHostName(); + } + catch (Exception e) + { + } + id = host + "-" + UUID.randomUUID().toString(); + this.prefix = prefix; + this.myControlQueueAddr = id + ";{create: always}"; + } + + public void setUp() throws Exception + { + con = config.createConnection(); + con.start(); + + controllerSession = con.createSession(false, Session.AUTO_ACKNOWLEDGE); + + dest = createDestination(); + controllerQueue = AMQDestination.createDestination(CONTROLLER_ADDR); + myControlQueue = session.createQueue(myControlQueueAddr); + msgType = MessageType.getType(config.getMessageType()); + _logger.debug("Using " + msgType + " messages"); + + sendToController = controllerSession.createProducer(controllerQueue); + receiveFromController = controllerSession.createConsumer(myControlQueue); + } + + private Destination createDestination() throws Exception + { + if (config.isUseUniqueDests()) + { + _logger.debug("Prefix : " + prefix); + Address addr = Address.parse(config.getAddress()); + AMQDestination temp = (AMQDestination) AMQDestination.createDestination(config.getAddress()); + int type = ((AMQSession_0_10)session).resolveAddressType(temp); + + if ( type == AMQDestination.TOPIC_TYPE) + { + addr = new Address(addr.getName(),addr.getSubject() + "." + prefix,addr.getOptions()); + System.out.println("Setting subject : " + addr); + } + else + { + addr = new Address(addr.getName() + "_" + prefix,addr.getSubject(),addr.getOptions()); + System.out.println("Setting name : " + addr); + } + + return AMQDestination.createDestination(addr.toString()); + } + else + { + return AMQDestination.createDestination(config.getAddress()); + } + } + + public synchronized void sendMessageToController(MapMessage m) throws Exception + { + m.setString(ID, id); + m.setString(REPLY_ADDR,myControlQueueAddr); + sendToController.send(m); + } + + public void receiveFromController(OPCode expected) throws Exception + { + MapMessage m = (MapMessage)receiveFromController.receive(); + OPCode code = OPCode.values()[m.getInt(CODE)]; + _logger.debug("Received Code : " + code); + if (expected != code) + { + throw new Exception("Expected OPCode : " + expected + " but received : " + code); + } + + } + + public boolean continueTest() throws Exception + { + MapMessage m = (MapMessage)receiveFromController.receive(); + OPCode code = OPCode.values()[m.getInt(CODE)]; + _logger.debug("Received Code : " + code); + return (code == OPCode.CONTINUE_TEST); + } + + public void tearDown() throws Exception + { + session.close(); + controllerSession.close(); + con.close(); + } + + public void handleError(Exception e,String msg) + { + StringBuilder sb = new StringBuilder(); + sb.append(msg); + sb.append(" "); + sb.append(e.getMessage()); + System.err.println(sb.toString()); + e.printStackTrace(); + } +} + diff --git a/java/tools/src/main/java/org/apache/qpid/tools/MercuryConsumerController.java b/java/tools/src/main/java/org/apache/qpid/tools/MercuryConsumerController.java new file mode 100644 index 0000000000..b35adc45d6 --- /dev/null +++ b/java/tools/src/main/java/org/apache/qpid/tools/MercuryConsumerController.java @@ -0,0 +1,231 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.tools; + +import java.util.concurrent.CountDownLatch; + +import javax.jms.MapMessage; + +import org.apache.qpid.thread.Threading; +import org.apache.qpid.tools.report.MercuryReporter; +import org.apache.qpid.tools.report.MercuryReporter.MercuryThroughputAndLatency; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * PerfConsumer will receive x no of messages in warmup mode. + * Once it receives the Start message it will then signal the PerfProducer. + * It will start recording stats from the first message it receives after + * the warmup mode is done. + * + * The following calculations are done. + * The important numbers to look at is + * a) Avg Latency + * b) System throughput. + * + * Latency. + * ========= + * Currently this test is written with the assumption that either + * a) The Perf Producer and Consumer are on the same machine + * b) They are on separate machines that have their time synced via a Time Server + * + * In order to calculate latency the producer inserts a timestamp + * when the message is sent. The consumer will note the current time the message is + * received and will calculate the latency as follows + * latency = rcvdTime - msg.getJMSTimestamp() + * + * Through out the test it will keep track of the max and min latency to show the + * variance in latencies. + * + * Avg latency is measured by adding all latencies and dividing by the total msgs. + * + * Throughput + * =========== + * Consumer rate is calculated as + * rcvdMsgCount/(rcvdTime - startTime) + * + * Note that the testStartTime referes to when the producer sent the first message + * and startTime is when the consumer first received a message. + * + * rcvdTime keeps track of when the last message is received. + * + * All throughput rates are given as msg/sec so the rates are multiplied by 1000. + * + */ + +public class MercuryConsumerController extends MercuryBase +{ + private static final Logger _logger = LoggerFactory.getLogger(MercuryConsumerController.class); + MercuryReporter reporter; + TestConfiguration config; + QpidReceive receiver; + + public MercuryConsumerController(TestConfiguration config, MercuryReporter reporter, String prefix) + { + super(config,prefix); + this.reporter = reporter; + if (_logger.isInfoEnabled()) + { + _logger.info("Consumer ID : " + id); + } + } + + public void setUp() throws Exception + { + super.setUp(); + receiver = new QpidReceive(reporter,config, con,dest); + receiver.setUp(); + MapMessage m = controllerSession.createMapMessage(); + m.setInt(CODE, OPCode.REGISTER_CONSUMER.ordinal()); + sendMessageToController(m); + } + + public void warmup()throws Exception + { + receiveFromController(OPCode.CONSUMER_STARTWARMUP); + receiver.waitforCompletion(config.getWarmupCount()); + + // It's more realistic for the consumer to signal this. + MapMessage m1 = controllerSession.createMapMessage(); + m1.setInt(CODE, OPCode.PRODUCER_READY.ordinal()); + sendMessageToController(m1); + + MapMessage m2 = controllerSession.createMapMessage(); + m2.setInt(CODE, OPCode.CONSUMER_READY.ordinal()); + sendMessageToController(m2); + } + + public void runReceiver() throws Exception + { + if (_logger.isInfoEnabled()) + { + _logger.info("Consumer: " + id + " Starting iteration......" + "\n"); + } + resetCounters(); + receiver.waitforCompletion(config.getMsgCount()); + MapMessage m = controllerSession.createMapMessage(); + m.setInt(CODE, OPCode.RECEIVED_END_MSG.ordinal()); + sendMessageToController(m); + } + + public void resetCounters() + { + reporter.clear(); + } + + public void sendResults() throws Exception + { + receiveFromController(OPCode.CONSUMER_STOP); + reporter.report(); + + MapMessage m = controllerSession.createMapMessage(); + m.setInt(CODE, OPCode.RECEIVED_CONSUMER_STATS.ordinal()); + m.setDouble(AVG_LATENCY, reporter.getAvgLatency()); + m.setDouble(MIN_LATENCY, reporter.getMinLatency()); + m.setDouble(MAX_LATENCY, reporter.getMaxLatency()); + m.setDouble(STD_DEV, reporter.getStdDev()); + m.setDouble(CONS_RATE, reporter.getRate()); + m.setLong(MSG_COUNT, reporter.getSampleSize()); + sendMessageToController(m); + + reporter.log(new StringBuilder("Total Msgs Received : ").append(reporter.getSampleSize()).toString()); + reporter.log(new StringBuilder("Consumer rate : "). + append(config.getDecimalFormat().format(reporter.getRate())). + append(" msg/sec").toString()); + reporter.log(new StringBuilder("Avg Latency : "). + append(config.getDecimalFormat().format(reporter.getAvgLatency())). + append(" ms").toString()); + reporter.log(new StringBuilder("Min Latency : "). + append(config.getDecimalFormat().format(reporter.getMinLatency())). + append(" ms").toString()); + reporter.log(new StringBuilder("Max Latency : "). + append(config.getDecimalFormat().format(reporter.getMaxLatency())). + append(" ms").toString()); + if (config.isPrintStdDev()) + { + reporter.log(new StringBuilder("Std Dev : "). + append(reporter.getStdDev()).toString()); + } + } + + public void run() + { + try + { + setUp(); + warmup(); + boolean nextIteration = true; + while (nextIteration) + { + System.out.println("=========================================================\n"); + System.out.println("Consumer: " + id + " starting a new iteration ......\n"); + runReceiver(); + sendResults(); + nextIteration = continueTest(); + } + tearDown(); + } + catch(Exception e) + { + handleError(e,"Error when running test"); + } + } + + @Override + public void tearDown() throws Exception + { + super.tearDown(); + } + + public static void main(String[] args) throws Exception + { + TestConfiguration config = new JVMArgConfiguration(); + MercuryReporter reporter= new MercuryReporter(MercuryThroughputAndLatency.class,System.out,10,true); + String scriptId = (args.length == 1) ? args[0] : ""; + int conCount = config.getConnectionCount(); + final CountDownLatch testCompleted = new CountDownLatch(conCount); + for (int i=0; i < conCount; i++) + { + final MercuryConsumerController cons = new MercuryConsumerController(config, reporter, scriptId + i); + Runnable r = new Runnable() + { + public void run() + { + cons.run(); + testCompleted.countDown(); + } + }; + + Thread t; + try + { + t = Threading.getThreadFactory().createThread(r); + } + catch(Exception e) + { + throw new Error("Error creating consumer thread",e); + } + t.start(); + } + testCompleted.await(); + reporter.log("Consumers have completed the test......\n"); + } +} \ No newline at end of file diff --git a/java/tools/src/main/java/org/apache/qpid/tools/MercuryProducerController.java b/java/tools/src/main/java/org/apache/qpid/tools/MercuryProducerController.java new file mode 100644 index 0000000000..02377bb853 --- /dev/null +++ b/java/tools/src/main/java/org/apache/qpid/tools/MercuryProducerController.java @@ -0,0 +1,210 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.tools; + +import java.util.concurrent.CountDownLatch; + +import javax.jms.MapMessage; + +import org.apache.qpid.thread.Threading; +import org.apache.qpid.tools.report.MercuryReporter; +import org.apache.qpid.tools.report.MercuryReporter.MercuryThroughput; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * PerfProducer sends an x no of messages in warmup mode and wait for a confirmation + * from the consumer that it has successfully consumed them and ready to start the + * test. It will start sending y no of messages and each message will contain a time + * stamp. This will be used at the receiving end to measure the latency. + * + * This is done with the assumption that both consumer and producer are running on + * the same machine or different machines which have time synced using a time server. + * + * This test also calculates the producer rate as follows. + * rate = msg_count/(time_before_sending_msgs - time_after_sending_msgs) + * + * All throughput rates are given as msg/sec so the rates are multiplied by 1000. + * + * Rajith - Producer rate is not an accurate perf metric IMO. + * It is heavily inlfuenced by any in memory buffering. + * System throughput and latencies calculated by the PerfConsumer are more realistic + * numbers. + * + * Answer by rajith : I agree about in memory buffering affecting rates. But Based on test runs + * I have done so far, it seems quite useful to compute the producer rate as it gives an + * indication of how the system behaves. For ex if there is a gap between producer and consumer rates + * you could clearly see the higher latencies and when producer and consumer rates are very close, + * latency is good. + * + */ +public class MercuryProducerController extends MercuryBase +{ + private static final Logger _logger = LoggerFactory.getLogger(MercuryProducerController.class); + MercuryReporter reporter; + QpidSend sender; + + public MercuryProducerController(TestConfiguration config, MercuryReporter reporter, String prefix) + { + super(config,prefix); + this.reporter = reporter; + System.out.println("Producer ID : " + id); + } + + public void setUp() throws Exception + { + super.setUp(); + sender = new QpidSend(reporter,config, con,dest); + sender.setUp(); + MapMessage m = controllerSession.createMapMessage(); + m.setInt(CODE, OPCode.REGISTER_PRODUCER.ordinal()); + sendMessageToController(m); + } + + public void warmup()throws Exception + { + receiveFromController(OPCode.PRODUCER_STARTWARMUP); + if (_logger.isInfoEnabled()) + { + _logger.info("Producer: " + id + " Warming up......"); + } + sender.send(config.getWarmupCount()); + sender.sendEndMessage(); + } + + public void runSender() throws Exception + { + resetCounters(); + receiveFromController(OPCode.PRODUCER_START); + sender.send(config.getMsgCount()); + } + + public void resetCounters() + { + sender.resetCounters(); + } + + public void sendResults() throws Exception + { + MapMessage msg = controllerSession.createMapMessage(); + msg.setInt(CODE, OPCode.RECEIVED_PRODUCER_STATS.ordinal()); + msg.setDouble(PROD_RATE, reporter.getRate()); + sendMessageToController(msg); + reporter.log(new StringBuilder("Producer rate: "). + append(config.getDecimalFormat().format(reporter.getRate())). + append(" msg/sec"). + toString()); + } + + @Override + public void tearDown() throws Exception + { + sender.tearDown(); + super.tearDown(); + } + + public void run() + { + try + { + setUp(); + warmup(); + boolean nextIteration = true; + while (nextIteration) + { + if(_logger.isInfoEnabled()) + { + _logger.info("=========================================================\n"); + _logger.info("Producer: " + id + " starting a new iteration ......\n"); + } + runSender(); + sendResults(); + nextIteration = continueTest(); + } + tearDown(); + } + catch(Exception e) + { + handleError(e,"Error when running test"); + } + } + + public void startControllerIfNeeded() + { + if (!config.isExternalController()) + { + final MercuryTestController controller = new MercuryTestController(config); + Runnable r = new Runnable() + { + public void run() + { + controller.run(); + } + }; + + Thread t; + try + { + t = Threading.getThreadFactory().createThread(r); + } + catch(Exception e) + { + throw new Error("Error creating controller thread",e); + } + t.start(); + } + } + + public static void main(String[] args) throws Exception + { + TestConfiguration config = new JVMArgConfiguration(); + MercuryReporter reporter= new MercuryReporter(MercuryThroughput.class,System.out,10,true); + String scriptId = (args.length == 1) ? args[0] : ""; + int conCount = config.getConnectionCount(); + final CountDownLatch testCompleted = new CountDownLatch(conCount); + for (int i=0; i < conCount; i++) + { + final MercuryProducerController prod = new MercuryProducerController(config, reporter, scriptId + i); + prod.startControllerIfNeeded(); + Runnable r = new Runnable() + { + public void run() + { + prod.run(); + testCompleted.countDown(); + } + }; + + Thread t; + try + { + t = Threading.getThreadFactory().createThread(r); + } + catch(Exception e) + { + throw new Error("Error creating producer thread",e); + } + t.start(); + } + testCompleted.await(); + reporter.log("Producers have completed the test......"); + } +} \ No newline at end of file diff --git a/java/tools/src/main/java/org/apache/qpid/tools/MercuryTestController.java b/java/tools/src/main/java/org/apache/qpid/tools/MercuryTestController.java new file mode 100644 index 0000000000..8c66a1e44d --- /dev/null +++ b/java/tools/src/main/java/org/apache/qpid/tools/MercuryTestController.java @@ -0,0 +1,450 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.tools; + +import java.io.FileWriter; +import java.util.Collection; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; + +import javax.jms.MapMessage; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; + +import org.apache.qpid.client.message.AMQPEncodedMapMessage; +import org.apache.qpid.tools.report.Reporter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * The Controller coordinates a test run between a number + * of producers and consumers, configured via -Dprod_count and -Dcons_count. + * + * It waits till all the producers and consumers have registered and then + * conducts a warmup run. Once all consumers and producers have completed + * the warmup run and is ready, it will conduct the actual test run and + * collect all stats from the participants and calculates the system + * throughput, the avg/min/max for producer rates, consumer rates and latency. + * + * These stats are then printed to std out. + * The Controller also prints events to std out to give a running account + * of the test run in progress. Ex registering of participants, starting warmup ..etc. + * This allows a scripting tool to monitor the progress. + * + * The Controller can be run in two modes. + * 1. A single test run (default) where it just runs until the message count specified + * for the producers via -Dmsg_count is sent and received. + * + * 2. Time based, configured via -Dduration=x, where x is in mins. + * In this mode, the Controller repeatedly cycles through the tests (after an initial + * warmup run) until the desired time is reached. If a test run is in progress + * and the time is up, it will allow the run the complete. + * + * After each iteration, the stats will be printed out in csv format to a separate log file. + * System throughput is calculated as follows + * totalMsgCount/(totalTestTime) + */ +public class MercuryTestController extends MercuryBase implements MessageListener +{ + private static final Logger _logger = LoggerFactory.getLogger(MercuryProducerController.class); + + enum TestMode { SINGLE_RUN, TIME_BASED }; + + TestMode testMode = TestMode.SINGLE_RUN; + + long totalTestTime; + + private double avgSystemLatency = 0.0; + private double minSystemLatency = Double.MAX_VALUE; + private double maxSystemLatency = 0; + private double avgSystemLatencyStdDev = 0.0; + + private double avgSystemConsRate = 0.0; + private double maxSystemConsRate = 0.0; + private double minSystemConsRate = Double.MAX_VALUE; + + private double avgSystemProdRate = 0.0; + private double maxSystemProdRate = 0.0; + private double minSystemProdRate = Double.MAX_VALUE; + + private long totalMsgCount = 0; + private double totalSystemThroughput = 0.0; + + private int consumerCount = Integer.getInteger("cons_count", 1); + private int producerCount = Integer.getInteger("prod_count", 1); + private int duration = Integer.getInteger("duration", -1); // in mins + private Map consumers; + private Map producers; + + private CountDownLatch consRegistered; + private CountDownLatch prodRegistered; + private CountDownLatch consReady; + private CountDownLatch prodReady; + private CountDownLatch receivedEndMsg; + private CountDownLatch receivedConsStats; + private CountDownLatch receivedProdStats; + + private MessageConsumer consumer; + private boolean printStdDev = false; + private FileWriter writer; + private Reporter report; + + public MercuryTestController(TestConfiguration config) + { + super(config,""); + + consumers = new ConcurrentHashMap(consumerCount); + producers = new ConcurrentHashMap(producerCount); + + consRegistered = new CountDownLatch(consumerCount); + prodRegistered = new CountDownLatch(producerCount); + consReady = new CountDownLatch(consumerCount); + prodReady = new CountDownLatch(producerCount); + printStdDev = config.isPrintStdDev(); + testMode = (duration == -1) ? TestMode.SINGLE_RUN : TestMode.TIME_BASED; + } + + public void setUp() throws Exception + { + super.setUp(); + if (testMode == TestMode.TIME_BASED) + { + writer = new FileWriter("stats-csv.log"); + } + consumer = controllerSession.createConsumer(controllerQueue); + report.log("\nController: " + producerCount + " producers are expected"); + report.log("Controller: " + consumerCount + " consumers are expected \n"); + consumer.setMessageListener(this); + consRegistered.await(); + prodRegistered.await(); + report.log("\nController: All producers and consumers have registered......\n"); + } + + public void warmup() throws Exception + { + report.log("Controller initiating warm up sequence......"); + sendMessageToNodes(OPCode.CONSUMER_STARTWARMUP,consumers.values()); + sendMessageToNodes(OPCode.PRODUCER_STARTWARMUP,producers.values()); + prodReady.await(); + consReady.await(); + report.log("\nController : All producers and consumers are ready to start the test......\n"); + } + + public void startTest() throws Exception + { + resetCounters(); + report.log("\nController Starting test......"); + long start = Clock.getTime(); + sendMessageToNodes(OPCode.PRODUCER_START,producers.values()); + receivedEndMsg.await(); + totalTestTime = Clock.getTime() - start; + sendMessageToNodes(OPCode.CONSUMER_STOP,consumers.values()); + receivedProdStats.await(); + receivedConsStats.await(); + } + + public void resetCounters() + { + minSystemLatency = Double.MAX_VALUE; + maxSystemLatency = 0; + maxSystemConsRate = 0.0; + minSystemConsRate = Double.MAX_VALUE; + maxSystemProdRate = 0.0; + minSystemProdRate = Double.MAX_VALUE; + + totalMsgCount = 0; + + receivedConsStats = new CountDownLatch(consumerCount); + receivedProdStats = new CountDownLatch(producerCount); + receivedEndMsg = new CountDownLatch(producerCount); + } + + public void calcStats() throws Exception + { + double totLatency = 0.0; + double totStdDev = 0.0; + double totalConsRate = 0.0; + double totalProdRate = 0.0; + + MapMessage conStat = null; // for error handling + try + { + for (MapMessage m: consumers.values()) + { + conStat = m; + minSystemLatency = Math.min(minSystemLatency,m.getDouble(MIN_LATENCY)); + maxSystemLatency = Math.max(maxSystemLatency,m.getDouble(MAX_LATENCY)); + totLatency = totLatency + m.getDouble(AVG_LATENCY); + totStdDev = totStdDev + m.getDouble(STD_DEV); + + minSystemConsRate = Math.min(minSystemConsRate,m.getDouble(CONS_RATE)); + maxSystemConsRate = Math.max(maxSystemConsRate,m.getDouble(CONS_RATE)); + totalConsRate = totalConsRate + m.getDouble(CONS_RATE); + + totalMsgCount = totalMsgCount + m.getLong(MSG_COUNT); + } + } + catch(Exception e) + { + System.err.println("Error calculating stats from Consumer : " + conStat); + } + + + MapMessage prodStat = null; // for error handling + try + { + for (MapMessage m: producers.values()) + { + prodStat = m; + minSystemProdRate = Math.min(minSystemProdRate,m.getDouble(PROD_RATE)); + maxSystemProdRate = Math.max(maxSystemProdRate,m.getDouble(PROD_RATE)); + totalProdRate = totalProdRate + m.getDouble(PROD_RATE); + } + } + catch(Exception e) + { + System.err.println("Error calculating stats from Producer : " + conStat); + } + + avgSystemLatency = totLatency/consumers.size(); + avgSystemLatencyStdDev = totStdDev/consumers.size(); + avgSystemConsRate = totalConsRate/consumers.size(); + avgSystemProdRate = totalProdRate/producers.size(); + + report.log("Total test time : " + totalTestTime + " in " + Clock.getPrecision()); + + totalSystemThroughput = (totalMsgCount*Clock.convertToSecs()/totalTestTime); + } + + public void printResults() throws Exception + { + report.log(new StringBuilder("Total Msgs Received : ").append(totalMsgCount).toString()); + report.log(new StringBuilder("System Throughput : "). + append(config.getDecimalFormat().format(totalSystemThroughput)). + append(" msg/sec").toString()); + report.log(new StringBuilder("Avg Consumer rate : "). + append(config.getDecimalFormat().format(avgSystemConsRate)). + append(" msg/sec").toString()); + report.log(new StringBuilder("Min Consumer rate : "). + append(config.getDecimalFormat().format(minSystemConsRate)). + append(" msg/sec").toString()); + report.log(new StringBuilder("Max Consumer rate : "). + append(config.getDecimalFormat().format(maxSystemConsRate)). + append(" msg/sec").toString()); + + report.log(new StringBuilder("Avg Producer rate : "). + append(config.getDecimalFormat().format(avgSystemProdRate)). + append(" msg/sec").toString()); + report.log(new StringBuilder("Min Producer rate : "). + append(config.getDecimalFormat().format(minSystemProdRate)). + append(" msg/sec").toString()); + report.log(new StringBuilder("Max Producer rate : "). + append(config.getDecimalFormat().format(maxSystemProdRate)). + append(" msg/sec").toString()); + + report.log(new StringBuilder("Avg System Latency : "). + append(config.getDecimalFormat().format(avgSystemLatency)). + append(" ms").toString()); + report.log(new StringBuilder("Min System Latency : "). + append(config.getDecimalFormat().format(minSystemLatency)). + append(" ms").toString()); + report.log(new StringBuilder("Max System Latency : "). + append(config.getDecimalFormat().format(maxSystemLatency)). + append(" ms").toString()); + if (printStdDev) + { + report.log(new StringBuilder("Avg System Std Dev : "). + append(avgSystemLatencyStdDev).toString()); + } + } + + private synchronized void sendMessageToNodes(OPCode code,Collection nodes) throws Exception + { + report.log("\nController: Sending code " + code); + MessageProducer tmpProd = controllerSession.createProducer(null); + MapMessage msg = controllerSession.createMapMessage(); + msg.setInt(CODE, code.ordinal()); + for (MapMessage node : nodes) + { + if (node.getString(REPLY_ADDR) == null) + { + report.log("REPLY_ADDR is null " + node); + } + else + { + report.log("Controller: Sending " + code + " to " + node.getString(REPLY_ADDR)); + } + tmpProd.send(controllerSession.createQueue(node.getString(REPLY_ADDR)), msg); + } + } + + public void onMessage(Message msg) + { + try + { + MapMessage m = (MapMessage)msg; + OPCode code = OPCode.values()[m.getInt(CODE)]; + + report.log("\n---------Controller Received Code : " + code); + report.log("---------Data : " + ((AMQPEncodedMapMessage)m).getMap()); + + switch (code) + { + case REGISTER_CONSUMER : + if (consRegistered.getCount() == 0) + { + report.log("Warning : Expected number of consumers have already registered," + + "ignoring extra consumer"); + break; + } + consumers.put(m.getString(ID),m); + consRegistered.countDown(); + break; + + case REGISTER_PRODUCER : + if (prodRegistered.getCount() == 0) + { + report.log("Warning : Expected number of producers have already registered," + + "ignoring extra producer"); + break; + } + producers.put(m.getString(ID),m); + prodRegistered.countDown(); + break; + + case CONSUMER_READY : + consReady.countDown(); + break; + + case PRODUCER_READY : + prodReady.countDown(); + break; + + case RECEIVED_END_MSG : + receivedEndMsg.countDown(); + break; + + case RECEIVED_CONSUMER_STATS : + consumers.put(m.getString(ID),m); + receivedConsStats.countDown(); + break; + + case RECEIVED_PRODUCER_STATS : + producers.put(m.getString(ID),m); + receivedProdStats.countDown(); + break; + + default: + throw new Exception("Invalid OPCode " + code); + } + } + catch (Exception e) + { + handleError(e,"Error when receiving messages " + msg); + } + } + + public void run() + { + try + { + setUp(); + warmup(); + if (testMode == TestMode.SINGLE_RUN) + { + startTest(); + calcStats(); + printResults(); + } + else + { + long startTime = Clock.getTime(); + long timeLimit = duration * 60 * 1000; // duration is in mins. + boolean nextIteration = true; + while (nextIteration) + { + startTest(); + calcStats(); + writeStatsToFile(); + if (Clock.getTime() - startTime < timeLimit) + { + sendMessageToNodes(OPCode.CONTINUE_TEST,consumers.values()); + sendMessageToNodes(OPCode.CONTINUE_TEST,producers.values()); + nextIteration = true; + } + else + { + nextIteration = false; + } + } + } + tearDown(); + + } + catch(Exception e) + { + handleError(e,"Error when running test"); + } + } + + @Override + public void tearDown() throws Exception { + report.log("Controller: Completed the test......\n"); + if (testMode == TestMode.TIME_BASED) + { + writer.close(); + } + sendMessageToNodes(OPCode.STOP_TEST,consumers.values()); + sendMessageToNodes(OPCode.STOP_TEST,producers.values()); + super.tearDown(); + } + + public void writeStatsToFile() throws Exception + { + writer.append(String.valueOf(totalMsgCount)).append(","); + writer.append(config.getDecimalFormat().format(totalSystemThroughput)).append(","); + writer.append(config.getDecimalFormat().format(avgSystemConsRate)).append(","); + writer.append(config.getDecimalFormat().format(minSystemConsRate)).append(","); + writer.append(config.getDecimalFormat().format(maxSystemConsRate)).append(","); + writer.append(config.getDecimalFormat().format(avgSystemProdRate)).append(","); + writer.append(config.getDecimalFormat().format(minSystemProdRate)).append(","); + writer.append(config.getDecimalFormat().format(maxSystemProdRate)).append(","); + writer.append(config.getDecimalFormat().format(avgSystemLatency)).append(","); + writer.append(config.getDecimalFormat().format(minSystemLatency)).append(","); + writer.append(config.getDecimalFormat().format(maxSystemLatency)); + if (printStdDev) + { + writer.append(",").append(String.valueOf(avgSystemLatencyStdDev)); + } + writer.append("\n"); + writer.flush(); + } + + public static void main(String[] args) + { + TestConfiguration config = new JVMArgConfiguration(); + MercuryTestController controller = new MercuryTestController(config); + controller.run(); + } +} diff --git a/java/tools/src/main/java/org/apache/qpid/tools/PerfBase.java b/java/tools/src/main/java/org/apache/qpid/tools/PerfBase.java deleted file mode 100644 index 121e94cea1..0000000000 --- a/java/tools/src/main/java/org/apache/qpid/tools/PerfBase.java +++ /dev/null @@ -1,226 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.tools; - -import java.net.InetAddress; -import java.text.DecimalFormat; -import java.util.UUID; - -import javax.jms.Connection; -import javax.jms.Destination; -import javax.jms.MapMessage; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.Session; - -import org.apache.qpid.client.AMQAnyDestination; -import org.apache.qpid.client.AMQConnection; -import org.apache.qpid.client.AMQDestination; -import org.apache.qpid.client.AMQSession_0_10; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.messaging.Address; - -public class PerfBase -{ - public final static String CODE = "CODE"; - public final static String ID = "ID"; - public final static String REPLY_ADDR = "REPLY_ADDR"; - public final static String MAX_LATENCY = "MAX_LATENCY"; - public final static String MIN_LATENCY = "MIN_LATENCY"; - public final static String AVG_LATENCY = "AVG_LATENCY"; - public final static String STD_DEV = "STD_DEV"; - public final static String CONS_RATE = "CONS_RATE"; - public final static String PROD_RATE = "PROD_RATE"; - public final static String MSG_COUNT = "MSG_COUNT"; - public final static String TIMESTAMP = "Timestamp"; - - String CONTROLLER_ADDR = System.getProperty("CONT_ADDR","CONTROLLER;{create: always, node:{x-declare:{auto-delete:true}}}"); - - TestParams params; - Connection con; - Session session; - Session controllerSession; - Destination dest; - Destination myControlQueue; - Destination controllerQueue; - DecimalFormat df = new DecimalFormat("###.##"); - String id; - String myControlQueueAddr; - - MessageProducer sendToController; - MessageConsumer receiveFromController; - String prefix = ""; - - enum OPCode { - REGISTER_CONSUMER, REGISTER_PRODUCER, - PRODUCER_STARTWARMUP, CONSUMER_STARTWARMUP, - CONSUMER_READY, PRODUCER_READY, - PRODUCER_START, - RECEIVED_END_MSG, CONSUMER_STOP, - RECEIVED_PRODUCER_STATS, RECEIVED_CONSUMER_STATS, - CONTINUE_TEST, STOP_TEST - }; - - enum MessageType { - BYTES, TEXT, MAP, OBJECT; - - public static MessageType getType(String s) throws Exception - { - if ("text".equalsIgnoreCase(s)) - { - return TEXT; - } - else if ("bytes".equalsIgnoreCase(s)) - { - return BYTES; - } - /*else if ("map".equalsIgnoreCase(s)) - { - return MAP; - } - else if ("object".equalsIgnoreCase(s)) - { - return OBJECT; - }*/ - else - { - throw new Exception("Unsupported message type"); - } - } - }; - - MessageType msgType = MessageType.BYTES; - - public PerfBase(String prefix) - { - params = new TestParams(); - String host = ""; - try - { - host = InetAddress.getLocalHost().getHostName(); - } - catch (Exception e) - { - } - id = host + "-" + UUID.randomUUID().toString(); - this.prefix = prefix; - this.myControlQueueAddr = id + ";{create: always}"; - } - - public void setUp() throws Exception - { - if (params.getHost().equals("") || params.getPort() == -1) - { - con = new AMQConnection(params.getUrl()); - } - else - { - con = new AMQConnection(params.getHost(),params.getPort(),"guest","guest","test","test"); - } - con.start(); - session = con.createSession(params.isTransacted(), - params.isTransacted()? Session.SESSION_TRANSACTED:params.getAckMode()); - - controllerSession = con.createSession(false, Session.AUTO_ACKNOWLEDGE); - - dest = createDestination(); - controllerQueue = new AMQAnyDestination(CONTROLLER_ADDR); - myControlQueue = session.createQueue(myControlQueueAddr); - msgType = MessageType.getType(params.getMessageType()); - System.out.println("Using " + msgType + " messages"); - - sendToController = controllerSession.createProducer(controllerQueue); - receiveFromController = controllerSession.createConsumer(myControlQueue); - } - - private Destination createDestination() throws Exception - { - if (params.isUseUniqueDests()) - { - System.out.println("Prefix : " + prefix); - Address addr = Address.parse(params.getAddress()); - AMQAnyDestination temp = new AMQAnyDestination(params.getAddress()); - int type = ((AMQSession_0_10)session).resolveAddressType(temp); - - if ( type == AMQDestination.TOPIC_TYPE) - { - addr = new Address(addr.getName(),addr.getSubject() + "." + prefix,addr.getOptions()); - System.out.println("Setting subject : " + addr); - } - else - { - addr = new Address(addr.getName() + "_" + prefix,addr.getSubject(),addr.getOptions()); - System.out.println("Setting name : " + addr); - } - - return new AMQAnyDestination(addr); - } - else - { - return new AMQAnyDestination(params.getAddress()); - } - } - - public synchronized void sendMessageToController(MapMessage m) throws Exception - { - m.setString(ID, id); - m.setString(REPLY_ADDR,myControlQueueAddr); - sendToController.send(m); - } - - public void receiveFromController(OPCode expected) throws Exception - { - MapMessage m = (MapMessage)receiveFromController.receive(); - OPCode code = OPCode.values()[m.getInt(CODE)]; - System.out.println("Received Code : " + code); - if (expected != code) - { - throw new Exception("Expected OPCode : " + expected + " but received : " + code); - } - - } - - public boolean continueTest() throws Exception - { - MapMessage m = (MapMessage)receiveFromController.receive(); - OPCode code = OPCode.values()[m.getInt(CODE)]; - System.out.println("Received Code : " + code); - return (code == OPCode.CONTINUE_TEST); - } - - public void tearDown() throws Exception - { - session.close(); - controllerSession.close(); - con.close(); - } - - public void handleError(Exception e,String msg) - { - StringBuilder sb = new StringBuilder(); - sb.append(msg); - sb.append(" "); - sb.append(e.getMessage()); - System.err.println(sb.toString()); - e.printStackTrace(); - } -} - diff --git a/java/tools/src/main/java/org/apache/qpid/tools/PerfConsumer.java b/java/tools/src/main/java/org/apache/qpid/tools/PerfConsumer.java deleted file mode 100644 index b63892bb51..0000000000 --- a/java/tools/src/main/java/org/apache/qpid/tools/PerfConsumer.java +++ /dev/null @@ -1,325 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.tools; - -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.CountDownLatch; - -import javax.jms.MapMessage; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import javax.jms.TextMessage; - -import org.apache.qpid.client.AMQDestination; -import org.apache.qpid.thread.Threading; - -/** - * PerfConsumer will receive x no of messages in warmup mode. - * Once it receives the Start message it will then signal the PerfProducer. - * It will start recording stats from the first message it receives after - * the warmup mode is done. - * - * The following calculations are done. - * The important numbers to look at is - * a) Avg Latency - * b) System throughput. - * - * Latency. - * ========= - * Currently this test is written with the assumption that either - * a) The Perf Producer and Consumer are on the same machine - * b) They are on separate machines that have their time synced via a Time Server - * - * In order to calculate latency the producer inserts a timestamp - * when the message is sent. The consumer will note the current time the message is - * received and will calculate the latency as follows - * latency = rcvdTime - msg.getJMSTimestamp() - * - * Through out the test it will keep track of the max and min latency to show the - * variance in latencies. - * - * Avg latency is measured by adding all latencies and dividing by the total msgs. - * - * Throughput - * =========== - * Consumer rate is calculated as - * rcvdMsgCount/(rcvdTime - startTime) - * - * Note that the testStartTime referes to when the producer sent the first message - * and startTime is when the consumer first received a message. - * - * rcvdTime keeps track of when the last message is received. - * - * All throughput rates are given as msg/sec so the rates are multiplied by 1000. - * - */ - -public class PerfConsumer extends PerfBase implements MessageListener -{ - MessageConsumer consumer; - long maxLatency = 0; - long minLatency = Long.MAX_VALUE; - long totalLatency = 0; // to calculate avg latency. - int rcvdMsgCount = 0; - long startTime = 0; // to measure consumer throughput - long rcvdTime = 0; - boolean transacted = false; - int transSize = 0; - - boolean printStdDev = false; - List sample; - - final Object lock = new Object(); - - public PerfConsumer(String prefix) - { - super(prefix); - System.out.println("Consumer ID : " + id); - } - - public void setUp() throws Exception - { - super.setUp(); - consumer = session.createConsumer(dest); - System.out.println("Consumer: " + id + " Receiving messages from : " + ((AMQDestination)dest).getQueueName() + "\n"); - - // Storing the following two for efficiency - transacted = params.isTransacted(); - transSize = params.getTransactionSize(); - printStdDev = params.isPrintStdDev(); - MapMessage m = controllerSession.createMapMessage(); - m.setInt(CODE, OPCode.REGISTER_CONSUMER.ordinal()); - sendMessageToController(m); - } - - public void warmup()throws Exception - { - receiveFromController(OPCode.CONSUMER_STARTWARMUP); - Message msg = consumer.receive(); - // This is to ensure we drain the queue before we start the actual test. - while ( msg != null) - { - if (msg.getBooleanProperty("End") == true) - { - // It's more realistic for the consumer to signal this. - MapMessage m = controllerSession.createMapMessage(); - m.setInt(CODE, OPCode.PRODUCER_READY.ordinal()); - sendMessageToController(m); - } - msg = consumer.receive(1000); - } - - if (params.isTransacted()) - { - session.commit(); - } - - MapMessage m = controllerSession.createMapMessage(); - m.setInt(CODE, OPCode.CONSUMER_READY.ordinal()); - sendMessageToController(m); - consumer.setMessageListener(this); - } - - public void startTest() throws Exception - { - System.out.println("Consumer: " + id + " Starting test......" + "\n"); - resetCounters(); - } - - public void resetCounters() - { - rcvdMsgCount = 0; - maxLatency = 0; - minLatency = Long.MAX_VALUE; - totalLatency = 0; - if (printStdDev) - { - sample = null; - sample = new ArrayList(params.getMsgCount()); - } - } - - public void sendResults() throws Exception - { - receiveFromController(OPCode.CONSUMER_STOP); - - double avgLatency = (double)totalLatency/(double)rcvdMsgCount; - double consRate = (double)rcvdMsgCount*Clock.convertToSecs()/(double)(rcvdTime - startTime); - double stdDev = 0.0; - if (printStdDev) - { - stdDev = calculateStdDev(avgLatency); - } - MapMessage m = controllerSession.createMapMessage(); - m.setInt(CODE, OPCode.RECEIVED_CONSUMER_STATS.ordinal()); - m.setDouble(AVG_LATENCY, avgLatency/Clock.convertToMiliSecs()); - m.setDouble(MIN_LATENCY,minLatency/Clock.convertToMiliSecs()); - m.setDouble(MAX_LATENCY,maxLatency/Clock.convertToMiliSecs()); - m.setDouble(STD_DEV, stdDev/Clock.convertToMiliSecs()); - m.setDouble(CONS_RATE, consRate); - m.setLong(MSG_COUNT, rcvdMsgCount); - sendMessageToController(m); - - System.out.println(new StringBuilder("Total Msgs Received : ").append(rcvdMsgCount).toString()); - System.out.println(new StringBuilder("Consumer rate : "). - append(df.format(consRate)). - append(" msg/sec").toString()); - System.out.println(new StringBuilder("Avg Latency : "). - append(df.format(avgLatency/Clock.convertToMiliSecs())). - append(" ms").toString()); - System.out.println(new StringBuilder("Min Latency : "). - append(df.format(minLatency/Clock.convertToMiliSecs())). - append(" ms").toString()); - System.out.println(new StringBuilder("Max Latency : "). - append(df.format(maxLatency/Clock.convertToMiliSecs())). - append(" ms").toString()); - if (printStdDev) - { - System.out.println(new StringBuilder("Std Dev : "). - append(stdDev/Clock.convertToMiliSecs()).toString()); - } - } - - public double calculateStdDev(double mean) - { - double v = 0; - for (double latency: sample) - { - v = v + Math.pow((latency-mean), 2); - } - v = v/sample.size(); - return Math.round(Math.sqrt(v)); - } - - public void onMessage(Message msg) - { - try - { - // To figure out the decoding overhead of text - if (msgType == MessageType.TEXT) - { - ((TextMessage)msg).getText(); - } - - if (msg.getBooleanProperty("End")) - { - MapMessage m = controllerSession.createMapMessage(); - m.setInt(CODE, OPCode.RECEIVED_END_MSG.ordinal()); - sendMessageToController(m); - } - else - { - rcvdTime = Clock.getTime(); - rcvdMsgCount ++; - - if (rcvdMsgCount == 1) - { - startTime = rcvdTime; - } - - if (transacted && (rcvdMsgCount % transSize == 0)) - { - session.commit(); - } - - long latency = rcvdTime - msg.getLongProperty(TIMESTAMP); - maxLatency = Math.max(maxLatency, latency); - minLatency = Math.min(minLatency, latency); - totalLatency = totalLatency + latency; - if (printStdDev) - { - sample.add(latency); - } - } - - } - catch(Exception e) - { - handleError(e,"Error when receiving messages"); - } - - } - - public void run() - { - try - { - setUp(); - warmup(); - boolean nextIteration = true; - while (nextIteration) - { - System.out.println("=========================================================\n"); - System.out.println("Consumer: " + id + " starting a new iteration ......\n"); - startTest(); - sendResults(); - nextIteration = continueTest(); - } - tearDown(); - } - catch(Exception e) - { - handleError(e,"Error when running test"); - } - } - - @Override - public void tearDown() throws Exception - { - super.tearDown(); - } - - public static void main(String[] args) throws InterruptedException - { - String scriptId = (args.length == 1) ? args[0] : ""; - int conCount = Integer.getInteger("con_count",1); - final CountDownLatch testCompleted = new CountDownLatch(conCount); - for (int i=0; i < conCount; i++) - { - - final PerfConsumer cons = new PerfConsumer(scriptId + i); - Runnable r = new Runnable() - { - public void run() - { - cons.run(); - testCompleted.countDown(); - } - }; - - Thread t; - try - { - t = Threading.getThreadFactory().createThread(r); - } - catch(Exception e) - { - throw new Error("Error creating consumer thread",e); - } - t.start(); - - } - testCompleted.await(); - System.out.println("Consumers have completed the test......\n"); - } -} \ No newline at end of file diff --git a/java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java b/java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java deleted file mode 100644 index ac6129ab68..0000000000 --- a/java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java +++ /dev/null @@ -1,358 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.tools; - -import java.util.ArrayList; -import java.util.List; -import java.util.Random; -import java.util.concurrent.CountDownLatch; - -import javax.jms.BytesMessage; -import javax.jms.DeliveryMode; -import javax.jms.MapMessage; -import javax.jms.Message; -import javax.jms.MessageProducer; - -import org.apache.qpid.client.AMQDestination; -import org.apache.qpid.thread.Threading; - -/** - * PerfProducer sends an x no of messages in warmup mode and wait for a confirmation - * from the consumer that it has successfully consumed them and ready to start the - * test. It will start sending y no of messages and each message will contain a time - * stamp. This will be used at the receiving end to measure the latency. - * - * This is done with the assumption that both consumer and producer are running on - * the same machine or different machines which have time synced using a time server. - * - * This test also calculates the producer rate as follows. - * rate = msg_count/(time_before_sending_msgs - time_after_sending_msgs) - * - * All throughput rates are given as msg/sec so the rates are multiplied by 1000. - * - * Rajith - Producer rate is not an accurate perf metric IMO. - * It is heavily inlfuenced by any in memory buffering. - * System throughput and latencies calculated by the PerfConsumer are more realistic - * numbers. - * - * Answer by rajith : I agree about in memory buffering affecting rates. But Based on test runs - * I have done so far, it seems quite useful to compute the producer rate as it gives an - * indication of how the system behaves. For ex if there is a gap between producer and consumer rates - * you could clearly see the higher latencies and when producer and consumer rates are very close, - * latency is good. - * - */ -public class PerfProducer extends PerfBase -{ - private static long SEC = 60000; - - MessageProducer producer; - Message msg; - Object payload; - List payloads; - boolean cacheMsg = false; - boolean randomMsgSize = false; - boolean durable = false; - Random random; - int msgSizeRange = 1024; - boolean rateLimitProducer = false; - double rateFactor = 0.4; - double rate = 0.0; - - public PerfProducer(String prefix) - { - super(prefix); - System.out.println("Producer ID : " + id); - } - - public void setUp() throws Exception - { - super.setUp(); - durable = params.isDurable(); - rateLimitProducer = params.getRate() > 0 ? true : false; - if (rateLimitProducer) - { - System.out.println("The test will attempt to limit the producer to " + params.getRate() + " msg/sec"); - } - - // if message caching is enabled we pre create the message - // else we pre create the payload - if (params.isCacheMessage()) - { - cacheMsg = true; - msg = createMessage(createPayload(params.getMsgSize())); - msg.setJMSDeliveryMode(durable? - DeliveryMode.PERSISTENT : - DeliveryMode.NON_PERSISTENT - ); - } - else if (params.isRandomMsgSize()) - { - random = new Random(20080921); - randomMsgSize = true; - msgSizeRange = params.getMsgSize(); - payloads = new ArrayList(msgSizeRange); - - for (int i=0; i < msgSizeRange; i++) - { - payloads.add(createPayload(i)); - } - } - else - { - payload = createPayload(params.getMsgSize()); - } - - producer = session.createProducer(dest); - System.out.println("Producer: " + id + " Sending messages to: " + ((AMQDestination)dest).getQueueName()); - producer.setDisableMessageID(params.isDisableMessageID()); - producer.setDisableMessageTimestamp(params.isDisableTimestamp()); - - MapMessage m = controllerSession.createMapMessage(); - m.setInt(CODE, OPCode.REGISTER_PRODUCER.ordinal()); - sendMessageToController(m); - } - - Object createPayload(int size) - { - if (msgType == MessageType.TEXT) - { - return MessageFactory.createMessagePayload(size); - } - else - { - return MessageFactory.createMessagePayload(size).getBytes(); - } - } - - Message createMessage(Object payload) throws Exception - { - if (msgType == MessageType.TEXT) - { - return session.createTextMessage((String)payload); - } - else - { - BytesMessage m = session.createBytesMessage(); - m.writeBytes((byte[])payload); - return m; - } - } - - protected Message getNextMessage() throws Exception - { - if (cacheMsg) - { - return msg; - } - else - { - Message m; - - if (!randomMsgSize) - { - m = createMessage(payload); - } - else - { - m = createMessage(payloads.get(random.nextInt(msgSizeRange))); - } - m.setJMSDeliveryMode(durable? - DeliveryMode.PERSISTENT : - DeliveryMode.NON_PERSISTENT - ); - return m; - } - } - - public void warmup()throws Exception - { - receiveFromController(OPCode.PRODUCER_STARTWARMUP); - System.out.println("Producer: " + id + " Warming up......"); - - for (int i=0; i < params.getWarmupCount() -1; i++) - { - producer.send(getNextMessage()); - } - sendEndMessage(); - - if (params.isTransacted()) - { - session.commit(); - } - } - - public void startTest() throws Exception - { - resetCounters(); - receiveFromController(OPCode.PRODUCER_START); - int count = params.getMsgCount(); - boolean transacted = params.isTransacted(); - int tranSize = params.getTransactionSize(); - - long limit = (long)(params.getRate() * rateFactor); // in msecs - long timeLimit = (long)(SEC * rateFactor); // in msecs - - long start = Clock.getTime(); // defaults to nano secs - long interval = start; - for(int i=0; i < count; i++ ) - { - Message msg = getNextMessage(); - msg.setLongProperty(TIMESTAMP, Clock.getTime()); - producer.send(msg); - if ( transacted && ((i+1) % tranSize == 0)) - { - session.commit(); - } - - if (rateLimitProducer && i%limit == 0) - { - long elapsed = (Clock.getTime() - interval)*Clock.convertToMiliSecs(); // in msecs - if (elapsed < timeLimit) - { - Thread.sleep(elapsed); - } - interval = Clock.getTime(); - - } - } - sendEndMessage(); - if ( transacted) - { - session.commit(); - } - long time = Clock.getTime() - start; - rate = (double)count*Clock.convertToSecs()/(double)time; - System.out.println(new StringBuilder("Producer rate: "). - append(df.format(rate)). - append(" msg/sec"). - toString()); - } - - public void resetCounters() - { - - } - - public void sendEndMessage() throws Exception - { - Message msg = session.createMessage(); - msg.setBooleanProperty("End", true); - producer.send(msg); - } - - public void sendResults() throws Exception - { - MapMessage msg = controllerSession.createMapMessage(); - msg.setInt(CODE, OPCode.RECEIVED_PRODUCER_STATS.ordinal()); - msg.setDouble(PROD_RATE, rate); - sendMessageToController(msg); - } - - @Override - public void tearDown() throws Exception - { - super.tearDown(); - } - - public void run() - { - try - { - setUp(); - warmup(); - boolean nextIteration = true; - while (nextIteration) - { - System.out.println("=========================================================\n"); - System.out.println("Producer: " + id + " starting a new iteration ......\n"); - startTest(); - sendResults(); - nextIteration = continueTest(); - } - tearDown(); - } - catch(Exception e) - { - handleError(e,"Error when running test"); - } - } - - public void startControllerIfNeeded() - { - if (!params.isExternalController()) - { - final PerfTestController controller = new PerfTestController(); - Runnable r = new Runnable() - { - public void run() - { - controller.run(); - } - }; - - Thread t; - try - { - t = Threading.getThreadFactory().createThread(r); - } - catch(Exception e) - { - throw new Error("Error creating controller thread",e); - } - t.start(); - } - } - - - public static void main(String[] args) throws InterruptedException - { - String scriptId = (args.length == 1) ? args[0] : ""; - int conCount = Integer.getInteger("con_count",1); - final CountDownLatch testCompleted = new CountDownLatch(conCount); - for (int i=0; i < conCount; i++) - { - final PerfProducer prod = new PerfProducer(scriptId + i); - prod.startControllerIfNeeded(); - Runnable r = new Runnable() - { - public void run() - { - prod.run(); - testCompleted.countDown(); - } - }; - - Thread t; - try - { - t = Threading.getThreadFactory().createThread(r); - } - catch(Exception e) - { - throw new Error("Error creating producer thread",e); - } - t.start(); - } - testCompleted.await(); - System.out.println("Producers have completed the test......"); - } -} \ No newline at end of file diff --git a/java/tools/src/main/java/org/apache/qpid/tools/PerfTestController.java b/java/tools/src/main/java/org/apache/qpid/tools/PerfTestController.java deleted file mode 100644 index 5fca1fa4bd..0000000000 --- a/java/tools/src/main/java/org/apache/qpid/tools/PerfTestController.java +++ /dev/null @@ -1,442 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.tools; - -import java.io.FileWriter; -import java.util.Collection; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CountDownLatch; - -import javax.jms.MapMessage; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import javax.jms.MessageProducer; - -import org.apache.qpid.client.message.AMQPEncodedMapMessage; - -/** - * The Controller coordinates a test run between a number - * of producers and consumers, configured via -Dprod_count and -Dcons_count. - * - * It waits till all the producers and consumers have registered and then - * conducts a warmup run. Once all consumers and producers have completed - * the warmup run and is ready, it will conduct the actual test run and - * collect all stats from the participants and calculates the system - * throughput, the avg/min/max for producer rates, consumer rates and latency. - * - * These stats are then printed to std out. - * The Controller also prints events to std out to give a running account - * of the test run in progress. Ex registering of participants, starting warmup ..etc. - * This allows a scripting tool to monitor the progress. - * - * The Controller can be run in two modes. - * 1. A single test run (default) where it just runs until the message count specified - * for the producers via -Dmsg_count is sent and received. - * - * 2. Time based, configured via -Dduration=x, where x is in mins. - * In this mode, the Controller repeatedly cycles through the tests (after an initial - * warmup run) until the desired time is reached. If a test run is in progress - * and the time is up, it will allow the run the complete. - * - * After each iteration, the stats will be printed out in csv format to a separate log file. - * System throughput is calculated as follows - * totalMsgCount/(totalTestTime) - */ -public class PerfTestController extends PerfBase implements MessageListener -{ - enum TestMode { SINGLE_RUN, TIME_BASED }; - - TestMode testMode = TestMode.SINGLE_RUN; - - long totalTestTime; - - private double avgSystemLatency = 0.0; - private double minSystemLatency = Double.MAX_VALUE; - private double maxSystemLatency = 0; - private double avgSystemLatencyStdDev = 0.0; - - private double avgSystemConsRate = 0.0; - private double maxSystemConsRate = 0.0; - private double minSystemConsRate = Double.MAX_VALUE; - - private double avgSystemProdRate = 0.0; - private double maxSystemProdRate = 0.0; - private double minSystemProdRate = Double.MAX_VALUE; - - private long totalMsgCount = 0; - private double totalSystemThroughput = 0.0; - - private int consumerCount = Integer.getInteger("cons_count", 1); - private int producerCount = Integer.getInteger("prod_count", 1); - private int duration = Integer.getInteger("duration", -1); // in mins - private Map consumers; - private Map producers; - - private CountDownLatch consRegistered; - private CountDownLatch prodRegistered; - private CountDownLatch consReady; - private CountDownLatch prodReady; - private CountDownLatch receivedEndMsg; - private CountDownLatch receivedConsStats; - private CountDownLatch receivedProdStats; - - private MessageConsumer consumer; - private boolean printStdDev = false; - FileWriter writer; - - public PerfTestController() - { - super(""); - consumers = new ConcurrentHashMap(consumerCount); - producers = new ConcurrentHashMap(producerCount); - - consRegistered = new CountDownLatch(consumerCount); - prodRegistered = new CountDownLatch(producerCount); - consReady = new CountDownLatch(consumerCount); - prodReady = new CountDownLatch(producerCount); - printStdDev = params.isPrintStdDev(); - testMode = (duration == -1) ? TestMode.SINGLE_RUN : TestMode.TIME_BASED; - } - - public void setUp() throws Exception - { - super.setUp(); - if (testMode == TestMode.TIME_BASED) - { - writer = new FileWriter("stats-csv.log"); - } - consumer = controllerSession.createConsumer(controllerQueue); - System.out.println("\nController: " + producerCount + " producers are expected"); - System.out.println("Controller: " + consumerCount + " consumers are expected \n"); - consumer.setMessageListener(this); - consRegistered.await(); - prodRegistered.await(); - System.out.println("\nController: All producers and consumers have registered......\n"); - } - - public void warmup() throws Exception - { - System.out.println("Controller initiating warm up sequence......"); - sendMessageToNodes(OPCode.CONSUMER_STARTWARMUP,consumers.values()); - sendMessageToNodes(OPCode.PRODUCER_STARTWARMUP,producers.values()); - prodReady.await(); - consReady.await(); - System.out.println("\nController : All producers and consumers are ready to start the test......\n"); - } - - public void startTest() throws Exception - { - resetCounters(); - System.out.println("\nController Starting test......"); - long start = Clock.getTime(); - sendMessageToNodes(OPCode.PRODUCER_START,producers.values()); - receivedEndMsg.await(); - totalTestTime = Clock.getTime() - start; - sendMessageToNodes(OPCode.CONSUMER_STOP,consumers.values()); - receivedProdStats.await(); - receivedConsStats.await(); - } - - public void resetCounters() - { - minSystemLatency = Double.MAX_VALUE; - maxSystemLatency = 0; - maxSystemConsRate = 0.0; - minSystemConsRate = Double.MAX_VALUE; - maxSystemProdRate = 0.0; - minSystemProdRate = Double.MAX_VALUE; - - totalMsgCount = 0; - - receivedConsStats = new CountDownLatch(consumerCount); - receivedProdStats = new CountDownLatch(producerCount); - receivedEndMsg = new CountDownLatch(producerCount); - } - - public void calcStats() throws Exception - { - double totLatency = 0.0; - double totStdDev = 0.0; - double totalConsRate = 0.0; - double totalProdRate = 0.0; - - MapMessage conStat = null; // for error handling - try - { - for (MapMessage m: consumers.values()) - { - conStat = m; - minSystemLatency = Math.min(minSystemLatency,m.getDouble(MIN_LATENCY)); - maxSystemLatency = Math.max(maxSystemLatency,m.getDouble(MAX_LATENCY)); - totLatency = totLatency + m.getDouble(AVG_LATENCY); - totStdDev = totStdDev + m.getDouble(STD_DEV); - - minSystemConsRate = Math.min(minSystemConsRate,m.getDouble(CONS_RATE)); - maxSystemConsRate = Math.max(maxSystemConsRate,m.getDouble(CONS_RATE)); - totalConsRate = totalConsRate + m.getDouble(CONS_RATE); - - totalMsgCount = totalMsgCount + m.getLong(MSG_COUNT); - } - } - catch(Exception e) - { - System.out.println("Error calculating stats from Consumer : " + conStat); - } - - - MapMessage prodStat = null; // for error handling - try - { - for (MapMessage m: producers.values()) - { - prodStat = m; - minSystemProdRate = Math.min(minSystemProdRate,m.getDouble(PROD_RATE)); - maxSystemProdRate = Math.max(maxSystemProdRate,m.getDouble(PROD_RATE)); - totalProdRate = totalProdRate + m.getDouble(PROD_RATE); - } - } - catch(Exception e) - { - System.out.println("Error calculating stats from Producer : " + conStat); - } - - avgSystemLatency = totLatency/consumers.size(); - avgSystemLatencyStdDev = totStdDev/consumers.size(); - avgSystemConsRate = totalConsRate/consumers.size(); - avgSystemProdRate = totalProdRate/producers.size(); - - System.out.println("Total test time : " + totalTestTime + " in " + Clock.getPrecision()); - - totalSystemThroughput = (totalMsgCount*Clock.convertToSecs()/totalTestTime); - } - - public void printResults() throws Exception - { - System.out.println(new StringBuilder("Total Msgs Received : ").append(totalMsgCount).toString()); - System.out.println(new StringBuilder("System Throughput : "). - append(df.format(totalSystemThroughput)). - append(" msg/sec").toString()); - System.out.println(new StringBuilder("Avg Consumer rate : "). - append(df.format(avgSystemConsRate)). - append(" msg/sec").toString()); - System.out.println(new StringBuilder("Min Consumer rate : "). - append(df.format(minSystemConsRate)). - append(" msg/sec").toString()); - System.out.println(new StringBuilder("Max Consumer rate : "). - append(df.format(maxSystemConsRate)). - append(" msg/sec").toString()); - - System.out.println(new StringBuilder("Avg Producer rate : "). - append(df.format(avgSystemProdRate)). - append(" msg/sec").toString()); - System.out.println(new StringBuilder("Min Producer rate : "). - append(df.format(minSystemProdRate)). - append(" msg/sec").toString()); - System.out.println(new StringBuilder("Max Producer rate : "). - append(df.format(maxSystemProdRate)). - append(" msg/sec").toString()); - - System.out.println(new StringBuilder("Avg System Latency : "). - append(df.format(avgSystemLatency)). - append(" ms").toString()); - System.out.println(new StringBuilder("Min System Latency : "). - append(df.format(minSystemLatency)). - append(" ms").toString()); - System.out.println(new StringBuilder("Max System Latency : "). - append(df.format(maxSystemLatency)). - append(" ms").toString()); - if (printStdDev) - { - System.out.println(new StringBuilder("Avg System Std Dev : "). - append(avgSystemLatencyStdDev)); - } - } - - private synchronized void sendMessageToNodes(OPCode code,Collection nodes) throws Exception - { - System.out.println("\nController: Sending code " + code); - MessageProducer tmpProd = controllerSession.createProducer(null); - MapMessage msg = controllerSession.createMapMessage(); - msg.setInt(CODE, code.ordinal()); - for (MapMessage node : nodes) - { - if (node.getString(REPLY_ADDR) == null) - { - System.out.println("REPLY_ADDR is null " + node); - } - else - { - System.out.println("Controller: Sending " + code + " to " + node.getString(REPLY_ADDR)); - } - tmpProd.send(controllerSession.createQueue(node.getString(REPLY_ADDR)), msg); - } - } - - public void onMessage(Message msg) - { - try - { - MapMessage m = (MapMessage)msg; - OPCode code = OPCode.values()[m.getInt(CODE)]; - - System.out.println("\n---------Controller Received Code : " + code); - System.out.println("---------Data : " + ((AMQPEncodedMapMessage)m).getMap()); - - switch (code) - { - case REGISTER_CONSUMER : - if (consRegistered.getCount() == 0) - { - System.out.println("Warning : Expected number of consumers have already registered," + - "ignoring extra consumer"); - break; - } - consumers.put(m.getString(ID),m); - consRegistered.countDown(); - break; - - case REGISTER_PRODUCER : - if (prodRegistered.getCount() == 0) - { - System.out.println("Warning : Expected number of producers have already registered," + - "ignoring extra producer"); - break; - } - producers.put(m.getString(ID),m); - prodRegistered.countDown(); - break; - - case CONSUMER_READY : - consReady.countDown(); - break; - - case PRODUCER_READY : - prodReady.countDown(); - break; - - case RECEIVED_END_MSG : - receivedEndMsg.countDown(); - break; - - case RECEIVED_CONSUMER_STATS : - consumers.put(m.getString(ID),m); - receivedConsStats.countDown(); - break; - - case RECEIVED_PRODUCER_STATS : - producers.put(m.getString(ID),m); - receivedProdStats.countDown(); - break; - - default: - throw new Exception("Invalid OPCode " + code); - } - } - catch (Exception e) - { - handleError(e,"Error when receiving messages " + msg); - } - } - - public void run() - { - try - { - setUp(); - warmup(); - if (testMode == TestMode.SINGLE_RUN) - { - startTest(); - calcStats(); - printResults(); - } - else - { - long startTime = Clock.getTime(); - long timeLimit = duration * 60 * 1000; // duration is in mins. - boolean nextIteration = true; - while (nextIteration) - { - startTest(); - calcStats(); - writeStatsToFile(); - if (Clock.getTime() - startTime < timeLimit) - { - sendMessageToNodes(OPCode.CONTINUE_TEST,consumers.values()); - sendMessageToNodes(OPCode.CONTINUE_TEST,producers.values()); - nextIteration = true; - } - else - { - nextIteration = false; - } - } - } - tearDown(); - - } - catch(Exception e) - { - handleError(e,"Error when running test"); - } - } - - @Override - public void tearDown() throws Exception { - System.out.println("Controller: Completed the test......\n"); - if (testMode == TestMode.TIME_BASED) - { - writer.close(); - } - sendMessageToNodes(OPCode.STOP_TEST,consumers.values()); - sendMessageToNodes(OPCode.STOP_TEST,producers.values()); - super.tearDown(); - } - - public void writeStatsToFile() throws Exception - { - writer.append(String.valueOf(totalMsgCount)).append(","); - writer.append(df.format(totalSystemThroughput)).append(","); - writer.append(df.format(avgSystemConsRate)).append(","); - writer.append(df.format(minSystemConsRate)).append(","); - writer.append(df.format(maxSystemConsRate)).append(","); - writer.append(df.format(avgSystemProdRate)).append(","); - writer.append(df.format(minSystemProdRate)).append(","); - writer.append(df.format(maxSystemProdRate)).append(","); - writer.append(df.format(avgSystemLatency)).append(","); - writer.append(df.format(minSystemLatency)).append(","); - writer.append(df.format(maxSystemLatency)); - if (printStdDev) - { - writer.append(",").append(String.valueOf(avgSystemLatencyStdDev)); - } - writer.append("\n"); - writer.flush(); - } - - public static void main(String[] args) - { - PerfTestController controller = new PerfTestController(); - controller.run(); - } -} diff --git a/java/tools/src/main/java/org/apache/qpid/tools/QpidReceive.java b/java/tools/src/main/java/org/apache/qpid/tools/QpidReceive.java new file mode 100644 index 0000000000..02f011f1b9 --- /dev/null +++ b/java/tools/src/main/java/org/apache/qpid/tools/QpidReceive.java @@ -0,0 +1,181 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.tools; + +import java.util.UUID; +import java.util.concurrent.CountDownLatch; + +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.Session; +import javax.jms.TextMessage; + +import org.apache.qpid.client.AMQDestination; +import org.apache.qpid.tools.TestConfiguration.MessageType; +import org.apache.qpid.tools.report.BasicReporter; +import org.apache.qpid.tools.report.Reporter; +import org.apache.qpid.tools.report.Statistics.Throughput; +import org.apache.qpid.tools.report.Statistics.ThroughputAndLatency; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class QpidReceive implements MessageListener +{ + private static final Logger _logger = LoggerFactory.getLogger(QpidSend.class); + private final CountDownLatch testCompleted = new CountDownLatch(1); + + private Connection con; + private Session session; + private Destination dest; + private MessageConsumer consumer; + private boolean transacted = false; + private boolean isRollback = false; + private int txSize = 0; + private int rollbackFrequency = 0; + private int ackFrequency = 0; + private int expected = 0; + private int received = 0; + private Reporter report; + private TestConfiguration config; + + public QpidReceive(Reporter report, TestConfiguration config, Connection con, Destination dest) + { + this(report,config, con, dest, UUID.randomUUID().toString()); + } + + public QpidReceive(Reporter report, TestConfiguration config, Connection con, Destination dest, String prefix) + { + //System.out.println("Producer ID : " + id); + this.report = report; + this.config = config; + this.con = con; + this.dest = dest; + } + + public void setUp() throws Exception + { + if (config.isTransacted()) + { + session = con.createSession(true, Session.SESSION_TRANSACTED); + } + else if (config.getAckFrequency() > 0) + { + session = con.createSession(false, Session.DUPS_OK_ACKNOWLEDGE); + } + else + { + session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); + } + consumer = session.createConsumer(dest); + consumer.setMessageListener(this); + System.out.println("Consumer: " + /*id +*/ " Receiving messages from : " + ((AMQDestination)dest).getAddressName() + "\n"); + + transacted = config.isTransacted(); + txSize = config.getTransactionSize(); + isRollback = config.getRollbackFrequency() > 0; + rollbackFrequency = config.getRollbackFrequency(); + ackFrequency = config.getAckFrequency(); + } + + public void resetCounters() + { + received = 0; + expected = 0; + report.clear(); + } + + public void onMessage(Message msg) + { + try + { + if (msg instanceof TextMessage && + TestConfiguration.EOS.equals(((TextMessage)msg).getText())) + { + testCompleted.countDown(); + return; + } + + received++; + report.message(msg); + + if (transacted && (received % txSize == 0)) + { + if (isRollback && (received % rollbackFrequency == 0)) + { + session.rollback(); + } + else + { + session.commit(); + } + } + else if (ackFrequency > 0) + { + msg.acknowledge(); + } + + if (expected >= received) + { + testCompleted.countDown(); + } + + } + catch(Exception e) + { + _logger.error("Error when receiving messages",e); + } + + } + + public void waitforCompletion(int expected) throws Exception + { + this.expected = expected; + testCompleted.await(); + } + + public void tearDown() throws Exception + { + session.close(); + } + + public static void main(String[] args) throws Exception + { + TestConfiguration config = new JVMArgConfiguration(); + Reporter reporter = new BasicReporter(config.isReportLatency()? ThroughputAndLatency.class : Throughput.class, + System.out, + config.reportEvery(), + config.isReportHeader() + ); + Destination dest = AMQDestination.createDestination(config.getAddress()); + QpidReceive receiver = new QpidReceive(reporter,config, config.createConnection(),dest); + receiver.setUp(); + receiver.waitforCompletion(config.getMsgCount()); + if (config.isReportTotal()) + { + reporter.report(); + } + receiver.tearDown(); + } + +} diff --git a/java/tools/src/main/java/org/apache/qpid/tools/QpidSend.java b/java/tools/src/main/java/org/apache/qpid/tools/QpidSend.java new file mode 100644 index 0000000000..c058b83d41 --- /dev/null +++ b/java/tools/src/main/java/org/apache/qpid/tools/QpidSend.java @@ -0,0 +1,291 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.tools; + +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import java.util.UUID; + +import javax.jms.BytesMessage; +import javax.jms.Connection; +import javax.jms.DeliveryMode; +import javax.jms.Destination; +import javax.jms.Message; +import javax.jms.MessageProducer; +import javax.jms.Session; + +import org.apache.qpid.client.AMQDestination; +import org.apache.qpid.tools.TestConfiguration.MessageType; +import org.apache.qpid.tools.report.BasicReporter; +import org.apache.qpid.tools.report.Reporter; +import org.apache.qpid.tools.report.Statistics.Throughput; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class QpidSend +{ + private Connection con; + private Session session; + private Destination dest; + private MessageProducer producer; + private MessageType msgType; + private Message msg; + private Object payload; + private List payloads; + private boolean cacheMsg = false; + private boolean randomMsgSize = false; + private boolean durable = false; + private Random random; + private int msgSizeRange = 1024; + private int totalMsgCount = 0; + private boolean rateLimitProducer = false; + private boolean transacted = false; + private int txSize = 0; + + private static final Logger _logger = LoggerFactory.getLogger(QpidSend.class); + Reporter report; + TestConfiguration config; + + public QpidSend(Reporter report, TestConfiguration config, Connection con, Destination dest) + { + this(report,config, con, dest, UUID.randomUUID().toString()); + } + + public QpidSend(Reporter report, TestConfiguration config, Connection con, Destination dest, String prefix) + { + //System.out.println("Producer ID : " + id); + this.report = report; + this.config = config; + this.con = con; + this.dest = dest; + } + + public void setUp() throws Exception + { + if (config.isTransacted()) + { + session = con.createSession(true, Session.SESSION_TRANSACTED); + } + else + { + session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); + } + + durable = config.isDurable(); + rateLimitProducer = config.getSendRate() > 0 ? true : false; + if (_logger.isDebugEnabled() && rateLimitProducer) + { + System.out.println("The test will attempt to limit the producer to " + config.getSendRate() + " msg/sec"); + } + + transacted = config.isTransacted(); + txSize = config.getTransactionSize(); + + msgType = MessageType.getType(config.getMessageType()); + // if message caching is enabled we pre create the message + // else we pre create the payload + if (config.isCacheMessage()) + { + cacheMsg = true; + msg = createMessage(createPayload(config.getMsgSize())); + msg.setJMSDeliveryMode(durable? + DeliveryMode.PERSISTENT : + DeliveryMode.NON_PERSISTENT + ); + } + else if (config.isRandomMsgSize()) + { + random = new Random(20080921); + randomMsgSize = true; + msgSizeRange = config.getMsgSize(); + payloads = new ArrayList(msgSizeRange); + + for (int i=0; i < msgSizeRange; i++) + { + payloads.add(createPayload(i)); + } + } + else + { + payload = createPayload(config.getMsgSize()); + } + + producer = session.createProducer(dest); + if (_logger.isDebugEnabled()) + { + System.out.println("Producer: " + /*id +*/ " Sending messages to: " + ((AMQDestination)dest).getAddressName()); + } + producer.setDisableMessageID(config.isDisableMessageID()); + producer.setDisableMessageTimestamp(config.isDisableTimestamp()); + } + + Object createPayload(int size) + { + if (msgType == MessageType.TEXT) + { + return MessageFactory.createMessagePayload(size); + } + else + { + return MessageFactory.createMessagePayload(size).getBytes(); + } + } + + Message createMessage(Object payload) throws Exception + { + if (msgType == MessageType.TEXT) + { + return session.createTextMessage((String)payload); + } + else + { + BytesMessage m = session.createBytesMessage(); + m.writeBytes((byte[])payload); + return m; + } + } + + protected Message getNextMessage() throws Exception + { + if (cacheMsg) + { + return msg; + } + else + { + Message m; + + if (!randomMsgSize) + { + m = createMessage(payload); + } + else + { + m = createMessage(payloads.get(random.nextInt(msgSizeRange))); + } + m.setJMSDeliveryMode(durable? + DeliveryMode.PERSISTENT : + DeliveryMode.NON_PERSISTENT + ); + return m; + } + } + + public void commit() throws Exception + { + session.commit(); + } + + public void send() throws Exception + { + send(config.getMsgCount()); + } + + public void send(int count) throws Exception + { + int sendRate = config.getSendRate(); + if (rateLimitProducer) + { + int iterations = count/sendRate; + int remainder = count%sendRate; + for (int i=0; i < iterations; i++) + { + long iterationStart = Clock.getTime(); + sendMessages(sendRate); + long elapsed = (Clock.getTime() - iterationStart)*Clock.convertToMiliSecs(); + long diff = Clock.SEC - elapsed; + if (diff > 0) + { + // We have sent more messages in a sec than specified by the rate. + Thread.sleep(diff); + } + } + sendMessages(remainder); + } + else + { + sendMessages(count); + } + } + + private void sendMessages(int count) throws Exception + { + boolean isTimestamp = config.isReportLatency(); + for(int i=0; i < count; i++ ) + { + Message msg = getNextMessage(); + if (isTimestamp) + { + msg.setLongProperty(TestConfiguration.TIMESTAMP, Clock.getTime()); + } + producer.send(msg); + report.message(msg); + totalMsgCount++; + + if ( transacted && ((totalMsgCount) % txSize == 0)) + { + session.commit(); + } + } + } + + public void resetCounters() + { + totalMsgCount = 0; + report.clear(); + } + + public void sendEndMessage() throws Exception + { + Message msg = session.createMessage(); + msg.setBooleanProperty(TestConfiguration.EOS, true); + producer.send(msg); + } + + public void tearDown() throws Exception + { + session.close(); + } + + public static void main(String[] args) throws Exception + { + TestConfiguration config = new JVMArgConfiguration(); + Reporter reporter = new BasicReporter(Throughput.class, + System.out, + config.reportEvery(), + config.isReportHeader() + ); + Destination dest = AMQDestination.createDestination(config.getAddress()); + QpidSend sender = new QpidSend(reporter,config, config.createConnection(),dest); + sender.setUp(); + sender.send(); + if (config.getSendEOS() > 0) + { + sender.sendEndMessage(); + } + if (config.isReportTotal()) + { + reporter.report(); + } + sender.tearDown(); + } +} diff --git a/java/tools/src/main/java/org/apache/qpid/tools/TestConfiguration.java b/java/tools/src/main/java/org/apache/qpid/tools/TestConfiguration.java new file mode 100644 index 0000000000..7f7df0e5e6 --- /dev/null +++ b/java/tools/src/main/java/org/apache/qpid/tools/TestConfiguration.java @@ -0,0 +1,126 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.tools; + +import java.text.DecimalFormat; + +import javax.jms.Connection; + +public interface TestConfiguration +{ + enum MessageType { + BYTES, TEXT, MAP, OBJECT; + + public static MessageType getType(String s) throws Exception + { + if ("text".equalsIgnoreCase(s)) + { + return TEXT; + } + else if ("bytes".equalsIgnoreCase(s)) + { + return BYTES; + } + /*else if ("map".equalsIgnoreCase(s)) + { + return MAP; + } + else if ("object".equalsIgnoreCase(s)) + { + return OBJECT; + }*/ + else + { + throw new Exception("Unsupported message type"); + } + } + }; + + public final static String TIMESTAMP = "ts"; + + public final static String EOS = "eos"; + + public final static String SEQUENCE_NUMBER = "sn"; + + public String getUrl(); + + public String getHost(); + + public int getPort(); + + public String getAddress(); + + public int getAckMode(); + + public int getMsgCount(); + + public int getMsgSize(); + + public int getRandomMsgSizeStartFrom(); + + public boolean isDurable(); + + public boolean isTransacted(); + + public int getTransactionSize(); + + public int getWarmupCount(); + + public boolean isCacheMessage(); + + public boolean isDisableMessageID(); + + public boolean isDisableTimestamp(); + + public boolean isRandomMsgSize(); + + public String getMessageType(); + + public boolean isPrintStdDev(); + + public int getSendRate(); + + public boolean isExternalController(); + + public boolean isUseUniqueDests(); + + public int getAckFrequency(); + + public Connection createConnection() throws Exception; + + public DecimalFormat getDecimalFormat(); + + public int reportEvery(); + + public boolean isReportTotal(); + + public boolean isReportHeader(); + + public boolean isReportLatency(); + + public int getSendEOS(); + + public int getConnectionCount(); + + public int getRollbackFrequency(); + + public boolean isPrintHeaders(); +} \ No newline at end of file diff --git a/java/tools/src/main/java/org/apache/qpid/tools/TestParams.java b/java/tools/src/main/java/org/apache/qpid/tools/TestParams.java deleted file mode 100644 index d73be0181b..0000000000 --- a/java/tools/src/main/java/org/apache/qpid/tools/TestParams.java +++ /dev/null @@ -1,214 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.tools; - -import javax.jms.Session; - -public class TestParams -{ - /* - * By default the connection URL is used. - * This allows a user to easily specify a fully fledged URL any given property. - * Ex. SSL parameters - * - * By providing a host & port allows a user to simply override the URL. - * This allows to create multiple clients in test scripts easily, - * without having to deal with the long URL format. - */ - private String url = "amqp://guest:guest@clientid/testpath?brokerlist='tcp://localhost:5672'"; - - private String host = ""; - - private int port = -1; - - private String address = "queue; {create : always}"; - - private int msg_size = 1024; - - private int random_msg_size_start_from = 1; - - private boolean cacheMessage = false; - - private boolean disableMessageID = false; - - private boolean disableTimestamp = false; - - private boolean durable = false; - - private boolean transacted = false; - - private int transaction_size = 1000; - - private int ack_mode = Session.AUTO_ACKNOWLEDGE; - - private int msg_count = 10; - - private int warmup_count = 1; - - private boolean random_msg_size = false; - - private String msgType = "bytes"; - - private boolean printStdDev = false; - - private long rate = -1; - - private boolean externalController = false; - - private boolean useUniqueDest = false; // useful when using multiple connections. - - public TestParams() - { - - url = System.getProperty("url",url); - host = System.getProperty("host",""); - port = Integer.getInteger("port", -1); - address = System.getProperty("address",address); - - msg_size = Integer.getInteger("msg_size", 1024); - cacheMessage = Boolean.getBoolean("cache_msg"); - disableMessageID = Boolean.getBoolean("disableMessageID"); - disableTimestamp = Boolean.getBoolean("disableTimestamp"); - durable = Boolean.getBoolean("durable"); - transacted = Boolean.getBoolean("transacted"); - transaction_size = Integer.getInteger("trans_size",1000); - ack_mode = Integer.getInteger("ack_mode",Session.AUTO_ACKNOWLEDGE); - msg_count = Integer.getInteger("msg_count",msg_count); - warmup_count = Integer.getInteger("warmup_count",warmup_count); - random_msg_size = Boolean.getBoolean("random_msg_size"); - msgType = System.getProperty("msg_type","bytes"); - printStdDev = Boolean.getBoolean("print_std_dev"); - rate = Long.getLong("rate",-1); - externalController = Boolean.getBoolean("ext_controller"); - useUniqueDest = Boolean.getBoolean("use_unique_dest"); - random_msg_size_start_from = Integer.getInteger("random_msg_size_start_from", 1); - } - - public String getUrl() - { - return url; - } - - public String getHost() - { - return host; - } - - public int getPort() - { - return port; - } - - public String getAddress() - { - return address; - } - - public int getAckMode() - { - return ack_mode; - } - - public int getMsgCount() - { - return msg_count; - } - - public int getMsgSize() - { - return msg_size; - } - - public int getRandomMsgSizeStartFrom() - { - return random_msg_size_start_from; - } - - public boolean isDurable() - { - return durable; - } - - public boolean isTransacted() - { - return transacted; - } - - public int getTransactionSize() - { - return transaction_size; - } - - public int getWarmupCount() - { - return warmup_count; - } - - public boolean isCacheMessage() - { - return cacheMessage; - } - - public boolean isDisableMessageID() - { - return disableMessageID; - } - - public boolean isDisableTimestamp() - { - return disableTimestamp; - } - - public boolean isRandomMsgSize() - { - return random_msg_size; - } - - public String getMessageType() - { - return msgType; - } - - public boolean isPrintStdDev() - { - return printStdDev; - } - - public long getRate() - { - return rate; - } - - public boolean isExternalController() - { - return externalController; - } - - public void setAddress(String addr) - { - address = addr; - } - - public boolean isUseUniqueDests() - { - return useUniqueDest; - } -} diff --git a/java/tools/src/main/java/org/apache/qpid/tools/report/BasicReporter.java b/java/tools/src/main/java/org/apache/qpid/tools/report/BasicReporter.java new file mode 100644 index 0000000000..a9896c1d4e --- /dev/null +++ b/java/tools/src/main/java/org/apache/qpid/tools/report/BasicReporter.java @@ -0,0 +1,113 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.tools.report; + +import java.io.PrintStream; +import java.lang.reflect.Constructor; + +import javax.jms.Message; + +public class BasicReporter implements Reporter +{ + PrintStream out; + int batchSize = 0; + int batchCount = 0; + boolean headerPrinted = false; + protected Statistics overall; + Statistics batch; + Constructor statCtor; + + public BasicReporter(Class clazz, PrintStream out, + int batchSize, boolean printHeader) throws Exception + { + this.out = out; + this.batchSize = batchSize; + this.headerPrinted = !printHeader; + statCtor = clazz.getConstructor(); + overall = statCtor.newInstance(); + if (batchSize > 0) + { + batch = statCtor.newInstance(); + } + } + + /* (non-Javadoc) + * @see org.apache.qpid.tools.report.Reporter#message(javax.jms.Message) + */ + @Override + public void message(Message msg) + { + overall.message(msg); + if (batchSize > 0) + { + batch.message(msg); + if (++batchCount == batchSize) + { + if (!headerPrinted) + { + header(); + } + batch.report(out); + batch.clear(); + batchCount = 0; + } + } + } + + /* (non-Javadoc) + * @see org.apache.qpid.tools.report.Reporter#report() + */ + @Override + public void report() + { + if (!headerPrinted) + { + header(); + } + overall.report(out); + } + + /* (non-Javadoc) + * @see org.apache.qpid.tools.report.Reporter#header() + */ + @Override + public void header() + { + headerPrinted = true; + overall.header(out); + } + + /* (non-Javadoc) + * @see org.apache.qpid.tools.report.Reporter#log() + */ + @Override + public void log(String s) + { + // NOOP + } + + @Override + public void clear() + { + batch.clear(); + overall.clear(); + } +} diff --git a/java/tools/src/main/java/org/apache/qpid/tools/report/MercuryReporter.java b/java/tools/src/main/java/org/apache/qpid/tools/report/MercuryReporter.java new file mode 100644 index 0000000000..e9bf7100c1 --- /dev/null +++ b/java/tools/src/main/java/org/apache/qpid/tools/report/MercuryReporter.java @@ -0,0 +1,167 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.tools.report; + +import java.io.PrintStream; + +import org.apache.qpid.tools.report.Statistics.Throughput; +import org.apache.qpid.tools.report.Statistics.ThroughputAndLatency; + +public class MercuryReporter extends BasicReporter +{ + MercuryStatistics stats; + + public MercuryReporter(Class clazz, PrintStream out, + int batchSize, boolean printHeader) throws Exception + { + super(clazz, out, batchSize, printHeader); + stats = (MercuryStatistics)overall; + } + + public double getRate() + { + return stats.getRate(); + } + + public double getAvgLatency() + { + return stats.getAvgLatency(); + } + + public double getStdDev() + { + return stats.getStdDev(); + } + + public double getMinLatency() + { + return stats.getMinLatency(); + } + + public double getMaxLatency() + { + return stats.getMaxLatency(); + } + + public int getSampleSize() + { + return stats.getSampleSize(); + } + + public interface MercuryStatistics extends Statistics + { + public double getRate(); + public long getMinLatency(); + public long getMaxLatency(); + public double getAvgLatency(); + public double getStdDev(); + public int getSampleSize(); + } + + public static class MercuryThroughput extends Throughput implements MercuryStatistics + { + double rate = 0; + + @Override + public void report(PrintStream out) + { + long elapsed = System.currentTimeMillis() - start; + rate = (double)messages/(double)elapsed; + } + + @Override + public void clear() + { + super.clear(); + rate = 0; + } + + public double getRate() + { + return rate; + } + + public int getSampleSize() + { + return messages; + } + + public long getMinLatency() { return 0; } + public long getMaxLatency() { return 0; } + public double getAvgLatency(){ return 0; } + public double getStdDev(){ return 0; } + + } + + public static class MercuryThroughputAndLatency extends ThroughputAndLatency implements MercuryStatistics + { + double rate = 0; + double avgLatency = 0; + double stdDev; + + @Override + public void report(PrintStream out) + { + long elapsed = System.currentTimeMillis() - start; + rate = (double)messages/(double)elapsed; + avgLatency = totalLatency/(double)sampleCount; + } + + @Override + public void clear() + { + super.clear(); + rate = 0; + avgLatency = 0; + } + + public double getRate() + { + return rate; + } + + public long getMinLatency() + { + return minLatency; + } + + public long getMaxLatency() + { + return maxLatency; + } + + public double getAvgLatency() + { + return avgLatency; + } + + public double getStdDev() + { + return stdDev; + } + + public int getSampleSize() + { + return messages; + } + } + +} diff --git a/java/tools/src/main/java/org/apache/qpid/tools/report/Reporter.java b/java/tools/src/main/java/org/apache/qpid/tools/report/Reporter.java new file mode 100644 index 0000000000..5e481458be --- /dev/null +++ b/java/tools/src/main/java/org/apache/qpid/tools/report/Reporter.java @@ -0,0 +1,40 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.tools.report; + +import javax.jms.Message; + +public interface Reporter +{ + + public void message(Message msg); + + public void report(); + + public void header(); + + // Will be used by some reporters to print statements which are greped by + // scripts. Example see java/tools/bin/perf-report + public void log(String s); + + public void clear(); + +} \ No newline at end of file diff --git a/java/tools/src/main/java/org/apache/qpid/tools/report/Statistics.java b/java/tools/src/main/java/org/apache/qpid/tools/report/Statistics.java new file mode 100644 index 0000000000..73efd1f1e0 --- /dev/null +++ b/java/tools/src/main/java/org/apache/qpid/tools/report/Statistics.java @@ -0,0 +1,139 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.tools.report; + +import java.io.PrintStream; +import java.text.DecimalFormat; + +import javax.jms.Message; + +public interface Statistics +{ + public void message(Message msg); + public void report(PrintStream out); + public void header(PrintStream out); + public void clear(); + + static class Throughput implements Statistics + { + DecimalFormat df = new DecimalFormat("###.##"); + int messages = 0; + long start = 0; + boolean started = false; + + @Override + public void message(Message msg) + { + ++messages; + if (!started) + { + start = System.currentTimeMillis(); + started = true; + } + } + + @Override + public void report(PrintStream out) + { + long elapsed = System.currentTimeMillis() - start; + out.print(df.format((double)messages/(double)elapsed)); + } + + @Override + public void header(PrintStream out) + { + out.print("tp(m/s)"); + } + + public void clear() + { + messages = 0; + start = 0; + started = false; + } + } + + static class ThroughputAndLatency extends Throughput + { + long minLatency = Long.MAX_VALUE; + long maxLatency = Long.MIN_VALUE; + double totalLatency = 0; + int sampleCount = 0; + + @Override + public void message(Message msg) + { + super.message(msg); + try + { + long ts = msg.getLongProperty("ts"); + long latency = System.currentTimeMillis() - ts; + minLatency = Math.min(latency, minLatency); + maxLatency = Math.min(latency, maxLatency); + totalLatency = totalLatency + latency; + sampleCount++; + } + catch(Exception e) + { + System.out.println("Error calculating latency"); + } + } + + @Override + public void report(PrintStream out) + { + super.report(out); + double avgLatency = totalLatency/(double)sampleCount; + out.append('\t') + .append(String.valueOf(minLatency)) + .append('\t') + .append(String.valueOf(maxLatency)) + .append('\t') + .append(df.format(avgLatency)); + + out.flush(); + } + + @Override + public void header(PrintStream out) + { + super.header(out); + out.append('\t') + .append("l-min") + .append('\t') + .append("l-max") + .append('\t') + .append("l-avg"); + + out.flush(); + } + + public void clear() + { + super.clear(); + minLatency = 0; + maxLatency = 0; + totalLatency = 0; + sampleCount = 0; + } + } + +} -- cgit v1.2.1