summaryrefslogtreecommitdiff
path: root/java/tools
diff options
context:
space:
mode:
authorKim van der Riet <kpvdr@apache.org>2012-05-04 15:39:19 +0000
committerKim van der Riet <kpvdr@apache.org>2012-05-04 15:39:19 +0000
commit633c33f224f3196f3f9bd80bd2e418d8143fea06 (patch)
tree1391da89470593209466df68c0b40b89c14963b1 /java/tools
parentc73f9286ebff93a6c8dbc29cf05e258c4b55c976 (diff)
downloadqpid-python-633c33f224f3196f3f9bd80bd2e418d8143fea06.tar.gz
QPID-3858: Updated branch - merged from trunk r.1333987
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1334037 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/tools')
-rwxr-xr-xjava/tools/bin/jms-quick-perf-report (renamed from java/tools/bin/perf-report)0
-rw-r--r--java/tools/bin/mercury-controller (renamed from java/tools/bin/controller)0
-rw-r--r--java/tools/bin/mercury-start-consumers (renamed from java/tools/bin/start-consumers)0
-rw-r--r--java/tools/bin/mercury-start-producers (renamed from java/tools/bin/start-producers)0
-rw-r--r--java/tools/src/main/java/org/apache/qpid/tools/Clock.java2
-rw-r--r--java/tools/src/main/java/org/apache/qpid/tools/JVMArgConfiguration.java411
-rw-r--r--java/tools/src/main/java/org/apache/qpid/tools/LatencyTest.java349
-rw-r--r--java/tools/src/main/java/org/apache/qpid/tools/MercuryBase.java (renamed from java/tools/src/main/java/org/apache/qpid/tools/PerfBase.java)81
-rw-r--r--java/tools/src/main/java/org/apache/qpid/tools/MercuryConsumerController.java231
-rw-r--r--java/tools/src/main/java/org/apache/qpid/tools/MercuryProducerController.java210
-rw-r--r--java/tools/src/main/java/org/apache/qpid/tools/MercuryTestController.java (renamed from java/tools/src/main/java/org/apache/qpid/tools/PerfTestController.java)148
-rw-r--r--java/tools/src/main/java/org/apache/qpid/tools/PerfConsumer.java325
-rw-r--r--java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java358
-rw-r--r--java/tools/src/main/java/org/apache/qpid/tools/QpidReceive.java181
-rw-r--r--java/tools/src/main/java/org/apache/qpid/tools/QpidSend.java291
-rw-r--r--java/tools/src/main/java/org/apache/qpid/tools/TestConfiguration.java126
-rw-r--r--java/tools/src/main/java/org/apache/qpid/tools/TestParams.java214
-rw-r--r--java/tools/src/main/java/org/apache/qpid/tools/report/BasicReporter.java113
-rw-r--r--java/tools/src/main/java/org/apache/qpid/tools/report/MercuryReporter.java167
-rw-r--r--java/tools/src/main/java/org/apache/qpid/tools/report/Reporter.java40
-rw-r--r--java/tools/src/main/java/org/apache/qpid/tools/report/Statistics.java139
21 files changed, 2012 insertions, 1374 deletions
diff --git a/java/tools/bin/perf-report b/java/tools/bin/jms-quick-perf-report
index 7de3f2b602..7de3f2b602 100755
--- a/java/tools/bin/perf-report
+++ b/java/tools/bin/jms-quick-perf-report
diff --git a/java/tools/bin/controller b/java/tools/bin/mercury-controller
index fab8614039..fab8614039 100644
--- a/java/tools/bin/controller
+++ b/java/tools/bin/mercury-controller
diff --git a/java/tools/bin/start-consumers b/java/tools/bin/mercury-start-consumers
index c71fc0c21f..c71fc0c21f 100644
--- a/java/tools/bin/start-consumers
+++ b/java/tools/bin/mercury-start-consumers
diff --git a/java/tools/bin/start-producers b/java/tools/bin/mercury-start-producers
index 7ba0286f7c..7ba0286f7c 100644
--- a/java/tools/bin/start-producers
+++ b/java/tools/bin/mercury-start-producers
diff --git a/java/tools/src/main/java/org/apache/qpid/tools/Clock.java b/java/tools/src/main/java/org/apache/qpid/tools/Clock.java
index 979d2ef76f..4e79dd62a8 100644
--- a/java/tools/src/main/java/org/apache/qpid/tools/Clock.java
+++ b/java/tools/src/main/java/org/apache/qpid/tools/Clock.java
@@ -27,6 +27,8 @@ package org.apache.qpid.tools;
public class Clock
{
+ public final static long SEC = 60000;
+
private static Precision precision;
private static long offset = -1; // in nano secs
diff --git a/java/tools/src/main/java/org/apache/qpid/tools/JVMArgConfiguration.java b/java/tools/src/main/java/org/apache/qpid/tools/JVMArgConfiguration.java
new file mode 100644
index 0000000000..c6abdf6c84
--- /dev/null
+++ b/java/tools/src/main/java/org/apache/qpid/tools/JVMArgConfiguration.java
@@ -0,0 +1,411 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tools;
+
+import java.text.DecimalFormat;
+
+import javax.jms.Connection;
+import javax.jms.Session;
+
+import org.apache.qpid.client.AMQConnection;
+
+public class JVMArgConfiguration implements TestConfiguration
+{
+ /*
+ * By default the connection URL is used.
+ * This allows a user to easily specify a fully fledged URL any given property.
+ * Ex. SSL parameters
+ *
+ * By providing a host & port allows a user to simply override the URL.
+ * This allows to create multiple clients in test scripts easily,
+ * without having to deal with the long URL format.
+ */
+ private String url = "amqp://guest:guest@clientid/testpath?brokerlist='tcp://localhost:5672'";
+
+ private String host = "";
+
+ private int port = -1;
+
+ private String address = "queue; {create : always}";
+
+ private int msg_size = 1024;
+
+ private int random_msg_size_start_from = 1;
+
+ private boolean cacheMessage = false;
+
+ private boolean disableMessageID = false;
+
+ private boolean disableTimestamp = false;
+
+ private boolean durable = false;
+
+ private int transaction_size = 0;
+
+ private int ack_mode = Session.AUTO_ACKNOWLEDGE;
+
+ private int msg_count = 10;
+
+ private int warmup_count = 1;
+
+ private boolean random_msg_size = false;
+
+ private String msgType = "bytes";
+
+ private boolean printStdDev = false;
+
+ private int sendRate = 0;
+
+ private boolean externalController = false;
+
+ private boolean useUniqueDest = false; // useful when using multiple connections.
+
+ private int ackFrequency = 100;
+
+ private DecimalFormat df = new DecimalFormat("###.##");
+
+ private int reportEvery = 0;
+
+ private boolean isReportTotal = false;
+
+ private boolean isReportHeader = true;
+
+ private boolean isReportLatency = false;
+
+ private int sendEOS = 0;
+
+ private int connectionCount = 1;
+
+ private int rollbackFrequency = 0;
+
+ private boolean printHeaders;
+
+ public JVMArgConfiguration()
+ {
+
+ url = System.getProperty("url",url);
+ host = System.getProperty("host","");
+ port = Integer.getInteger("port", -1);
+ address = System.getProperty("address",address);
+
+ msg_size = Integer.getInteger("msg-size", 1024);
+ cacheMessage = Boolean.getBoolean("cache-msg");
+ disableMessageID = Boolean.getBoolean("disable-message-id");
+ disableTimestamp = Boolean.getBoolean("disable-timestamp");
+ durable = Boolean.getBoolean("durable");
+ transaction_size = Integer.getInteger("tx",1000);
+ ack_mode = Integer.getInteger("ack-mode",Session.AUTO_ACKNOWLEDGE);
+ msg_count = Integer.getInteger("msg-count",msg_count);
+ warmup_count = Integer.getInteger("warmup-count",warmup_count);
+ random_msg_size = Boolean.getBoolean("random-msg-size");
+ msgType = System.getProperty("msg-type","bytes");
+ printStdDev = Boolean.getBoolean("print-std-dev");
+ sendRate = Integer.getInteger("rate",0);
+ externalController = Boolean.getBoolean("ext-controller");
+ useUniqueDest = Boolean.getBoolean("use-unique-dest");
+ random_msg_size_start_from = Integer.getInteger("random-msg-size-start-from", 1);
+ reportEvery = Integer.getInteger("report-every");
+ isReportTotal = Boolean.getBoolean("report-total");
+ isReportHeader = (System.getProperty("report-header") == null) ? true : Boolean.getBoolean("report-header");
+ isReportLatency = Boolean.getBoolean("report-latency");
+ sendEOS = Integer.getInteger("send-eos");
+ connectionCount = Integer.getInteger("con_count",1);
+ ackFrequency = Integer.getInteger("ack-frequency");
+ rollbackFrequency = Integer.getInteger("rollback-frequency");
+ printHeaders = Boolean.getBoolean("print-headers");
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.qpid.tools.TestConfiguration#getUrl()
+ */
+ @Override
+ public String getUrl()
+ {
+ return url;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.qpid.tools.TestConfiguration#getHost()
+ */
+ @Override
+ public String getHost()
+ {
+ return host;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.qpid.tools.TestConfiguration#getPort()
+ */
+ @Override
+ public int getPort()
+ {
+ return port;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.qpid.tools.TestConfiguration#getAddress()
+ */
+ @Override
+ public String getAddress()
+ {
+ return address;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.qpid.tools.TestConfiguration#getAckMode()
+ */
+ @Override
+ public int getAckMode()
+ {
+ return ack_mode;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.qpid.tools.TestConfiguration#getMsgCount()
+ */
+ @Override
+ public int getMsgCount()
+ {
+ return msg_count;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.qpid.tools.TestConfiguration#getMsgSize()
+ */
+ @Override
+ public int getMsgSize()
+ {
+ return msg_size;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.qpid.tools.TestConfiguration#getRandomMsgSizeStartFrom()
+ */
+ @Override
+ public int getRandomMsgSizeStartFrom()
+ {
+ return random_msg_size_start_from;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.qpid.tools.TestConfiguration#isDurable()
+ */
+ @Override
+ public boolean isDurable()
+ {
+ return durable;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.qpid.tools.TestConfiguration#isTransacted()
+ */
+ @Override
+ public boolean isTransacted()
+ {
+ return transaction_size > 0;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.qpid.tools.TestConfiguration#getTransactionSize()
+ */
+ @Override
+ public int getTransactionSize()
+ {
+ return transaction_size;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.qpid.tools.TestConfiguration#getWarmupCount()
+ */
+ @Override
+ public int getWarmupCount()
+ {
+ return warmup_count;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.qpid.tools.TestConfiguration#isCacheMessage()
+ */
+ @Override
+ public boolean isCacheMessage()
+ {
+ return cacheMessage;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.qpid.tools.TestConfiguration#isDisableMessageID()
+ */
+ @Override
+ public boolean isDisableMessageID()
+ {
+ return disableMessageID;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.qpid.tools.TestConfiguration#isDisableTimestamp()
+ */
+ @Override
+ public boolean isDisableTimestamp()
+ {
+ return disableTimestamp;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.qpid.tools.TestConfiguration#isRandomMsgSize()
+ */
+ @Override
+ public boolean isRandomMsgSize()
+ {
+ return random_msg_size;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.qpid.tools.TestConfiguration#getMessageType()
+ */
+ @Override
+ public String getMessageType()
+ {
+ return msgType;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.qpid.tools.TestConfiguration#isPrintStdDev()
+ */
+ @Override
+ public boolean isPrintStdDev()
+ {
+ return printStdDev;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.qpid.tools.TestConfiguration#getSendRate()
+ */
+ @Override
+ public int getSendRate()
+ {
+ return sendRate;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.qpid.tools.TestConfiguration#isExternalController()
+ */
+ @Override
+ public boolean isExternalController()
+ {
+ return externalController;
+ }
+
+ public void setAddress(String addr)
+ {
+ address = addr;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.qpid.tools.TestConfiguration#isUseUniqueDests()
+ */
+ @Override
+ public boolean isUseUniqueDests()
+ {
+ return useUniqueDest;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.qpid.tools.TestConfiguration#getAckFrequency()
+ */
+ @Override
+ public int getAckFrequency()
+ {
+ return ackFrequency;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.qpid.tools.TestConfiguration#createConnection()
+ */
+ @Override
+ public Connection createConnection() throws Exception
+ {
+ if (getHost().equals("") || getPort() == -1)
+ {
+ return new AMQConnection(getUrl());
+ }
+ else
+ {
+ return new AMQConnection(getHost(),getPort(),"guest","guest","test","test");
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.qpid.tools.TestConfiguration#getDecimalFormat()
+ */
+ @Override
+ public DecimalFormat getDecimalFormat()
+ {
+ return df;
+ }
+
+ @Override
+ public int reportEvery()
+ {
+ return reportEvery;
+ }
+
+ @Override
+ public boolean isReportTotal()
+ {
+ return isReportTotal;
+ }
+
+ @Override
+ public boolean isReportHeader()
+ {
+ return isReportHeader;
+ }
+
+ @Override
+ public boolean isReportLatency()
+ {
+ return isReportLatency;
+ }
+
+ @Override
+ public int getSendEOS()
+ {
+ return sendEOS;
+ }
+
+ @Override
+ public int getConnectionCount()
+ {
+ return connectionCount;
+ }
+
+ @Override
+ public int getRollbackFrequency()
+ {
+ return rollbackFrequency;
+ }
+
+ @Override
+ public boolean isPrintHeaders()
+ {
+ return printHeaders;
+ }
+}
diff --git a/java/tools/src/main/java/org/apache/qpid/tools/LatencyTest.java b/java/tools/src/main/java/org/apache/qpid/tools/LatencyTest.java
deleted file mode 100644
index 16149d17c9..0000000000
--- a/java/tools/src/main/java/org/apache/qpid/tools/LatencyTest.java
+++ /dev/null
@@ -1,349 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.tools;
-
-import java.io.FileOutputStream;
-import java.io.PrintWriter;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
-import javax.jms.BytesMessage;
-import javax.jms.DeliveryMode;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.TextMessage;
-
-import org.apache.qpid.thread.Threading;
-
-/**
- * Latency test sends an x number of messages in warmup mode and wait for a confirmation
- * from the consumer that it has successfully consumed them and ready to start the
- * test. It will start sending y number of messages and each message will contain a time
- * stamp. This will be used at the receiving end to measure the latency.
- *
- * It is important to have a sufficiently large number for the warmup count to
- * ensure the system is in steady state before the test is started.
- *
- * If you plan to plot the latencies then msg_count should be a smaller number (ex 500 or 1000)
- * You also need to specify a file name using -Dfile=/home/rajith/latency.log.1
- *
- * The idea is to get a latency sample for the system once it achieves steady state.
- *
- */
-
-public class LatencyTest extends PerfBase implements MessageListener
-{
- private MessageProducer producer;
- private MessageConsumer consumer;
- private Message msg;
- private byte[] payload;
- private long maxLatency = 0;
- private long minLatency = Long.MAX_VALUE;
- private long totalLatency = 0; // to calculate avg latency.
- private int rcvdMsgCount = 0;
- private double stdDev = 0;
- private double avgLatency = 0;
- private boolean warmup_mode = true;
- private boolean transacted = false;
- private int transSize = 0;
-
- private final List<Long> latencies;
- private final Lock lock = new ReentrantLock();
- private final Condition warmedUp;
- private final Condition testCompleted;
-
- public LatencyTest()
- {
- super("");
- warmedUp = lock.newCondition();
- testCompleted = lock.newCondition();
- // Storing the following two for efficiency
- transacted = params.isTransacted();
- transSize = params.getTransactionSize();
- latencies = new ArrayList <Long>(params.getMsgCount());
- }
-
- public void setUp() throws Exception
- {
- super.setUp();
- consumer = session.createConsumer(dest);
- consumer.setMessageListener(this);
-
- // if message caching is enabled we pre create the message
- // else we pre create the payload
- if (params.isCacheMessage())
- {
- msg = MessageFactory.createBytesMessage(session, params.getMsgSize());
- msg.setJMSDeliveryMode(params.isDurable()?
- DeliveryMode.PERSISTENT :
- DeliveryMode.NON_PERSISTENT
- );
- }
- else
- {
- payload = MessageFactory.createMessagePayload(params.getMsgSize()).getBytes();
- }
-
- producer = session.createProducer(dest);
- producer.setDisableMessageID(params.isDisableMessageID());
- producer.setDisableMessageTimestamp(params.isDisableTimestamp());
- }
-
- protected Message getNextMessage() throws Exception
- {
- if (params.isCacheMessage())
- {
- return msg;
- }
- else
- {
- msg = session.createBytesMessage();
- ((BytesMessage)msg).writeBytes(payload);
- return msg;
- }
- }
-
- public void warmup()throws Exception
- {
- System.out.println("Warming up......");
- int count = params.getWarmupCount();
- for (int i=0; i < count; i++)
- {
- producer.send(getNextMessage());
- }
- Message msg = session.createTextMessage("End");
- producer.send(msg);
-
- if (params.isTransacted())
- {
- session.commit();
- }
-
- try
- {
- lock.lock();
- warmedUp.await();
- }
- finally
- {
- lock.unlock();
- }
- }
-
- public void onMessage(Message msg)
- {
- try
- {
- if (msg instanceof TextMessage && ((TextMessage)msg).getText().equals("End"))
- {
- if (warmup_mode)
- {
- warmup_mode = false;
- try
- {
- lock.lock();
- warmedUp.signal();
- }
- finally
- {
- lock.unlock();
- }
- }
- else
- {
- computeStats();
- }
- }
- else if (!warmup_mode)
- {
- long time = System.currentTimeMillis();
- rcvdMsgCount ++;
-
- if (transacted && (rcvdMsgCount % transSize == 0))
- {
- session.commit();
- }
-
- long latency = time - msg.getJMSTimestamp();
- latencies.add(latency);
- totalLatency = totalLatency + latency;
- }
-
- }
- catch(Exception e)
- {
- handleError(e,"Error when receiving messages");
- }
-
- }
-
- private void computeStats()
- {
- avgLatency = (double)totalLatency/(double)rcvdMsgCount;
- double sigma = 0;
-
- for (long latency: latencies)
- {
- maxLatency = Math.max(maxLatency, latency);
- minLatency = Math.min(minLatency, latency);
- sigma = sigma + Math.pow(latency - avgLatency,2);
- }
-
- stdDev = Math.sqrt(sigma/(rcvdMsgCount -1));
-
- try
- {
- lock.lock();
- testCompleted.signal();
- }
- finally
- {
- lock.unlock();
- }
- }
-
- public void writeToFile() throws Exception
- {
- String fileName = System.getProperty("file");
- PrintWriter writer = new PrintWriter(new FileOutputStream(fileName));
- for (long latency: latencies)
- {
- writer.println(String.valueOf(latency));
- }
- writer.flush();
- writer.close();
- }
-
- public void printToConsole()
- {
- System.out.println(new StringBuilder("Total Msgs Received : ").append(rcvdMsgCount).toString());
- System.out.println(new StringBuilder("Standard Deviation : ").
- append(df.format(stdDev)).
- append(" ms").toString());
- System.out.println(new StringBuilder("Avg Latency : ").
- append(df.format(avgLatency)).
- append(" ms").toString());
- System.out.println(new StringBuilder("Min Latency : ").
- append(minLatency).
- append(" ms").toString());
- System.out.println(new StringBuilder("Max Latency : ").
- append(maxLatency).
- append(" ms").toString());
- System.out.println("Completed the test......\n");
- }
-
- public void startTest() throws Exception
- {
- System.out.println("Starting test......");
- int count = params.getMsgCount();
-
- for(int i=0; i < count; i++ )
- {
- Message msg = getNextMessage();
- msg.setJMSTimestamp(System.currentTimeMillis());
- producer.send(msg);
- if ( transacted && ((i+1) % transSize == 0))
- {
- session.commit();
- }
- }
- Message msg = session.createTextMessage("End");
- producer.send(msg);
- if (params.isTransacted())
- {
- session.commit();
- }
- }
-
- public void tearDown() throws Exception
- {
- try
- {
- lock.lock();
- testCompleted.await();
- }
- finally
- {
- lock.unlock();
- }
-
- producer.close();
- consumer.close();
- session.close();
- con.close();
- }
-
- public void test()
- {
- try
- {
- setUp();
- warmup();
- startTest();
- tearDown();
- }
- catch(Exception e)
- {
- handleError(e,"Error when running test");
- }
- }
-
-
- public static void main(String[] args)
- {
- final LatencyTest latencyTest = new LatencyTest();
- Runnable r = new Runnable()
- {
- public void run()
- {
- latencyTest.test();
- latencyTest.printToConsole();
- if (System.getProperty("file") != null)
- {
- try
- {
- latencyTest.writeToFile();
- }
- catch(Exception e)
- {
- e.printStackTrace();
- }
- }
- }
- };
-
- Thread t;
- try
- {
- t = Threading.getThreadFactory().createThread(r);
- }
- catch(Exception e)
- {
- throw new Error("Error creating latency test thread",e);
- }
- t.start();
- }
-}
diff --git a/java/tools/src/main/java/org/apache/qpid/tools/PerfBase.java b/java/tools/src/main/java/org/apache/qpid/tools/MercuryBase.java
index 121e94cea1..097b021b3e 100644
--- a/java/tools/src/main/java/org/apache/qpid/tools/PerfBase.java
+++ b/java/tools/src/main/java/org/apache/qpid/tools/MercuryBase.java
@@ -21,7 +21,6 @@
package org.apache.qpid.tools;
import java.net.InetAddress;
-import java.text.DecimalFormat;
import java.util.UUID;
import javax.jms.Connection;
@@ -32,14 +31,17 @@ import javax.jms.MessageProducer;
import javax.jms.Session;
import org.apache.qpid.client.AMQAnyDestination;
-import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.client.AMQSession_0_10;
-import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.messaging.Address;
+import org.apache.qpid.tools.TestConfiguration.MessageType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-public class PerfBase
+public class MercuryBase
{
+ private static final Logger _logger = LoggerFactory.getLogger(MercuryBase.class);
+
public final static String CODE = "CODE";
public final static String ID = "ID";
public final static String REPLY_ADDR = "REPLY_ADDR";
@@ -54,14 +56,13 @@ public class PerfBase
String CONTROLLER_ADDR = System.getProperty("CONT_ADDR","CONTROLLER;{create: always, node:{x-declare:{auto-delete:true}}}");
- TestParams params;
+ TestConfiguration config;
Connection con;
Session session;
Session controllerSession;
Destination dest;
Destination myControlQueue;
Destination controllerQueue;
- DecimalFormat df = new DecimalFormat("###.##");
String id;
String myControlQueueAddr;
@@ -69,7 +70,8 @@ public class PerfBase
MessageConsumer receiveFromController;
String prefix = "";
- enum OPCode {
+ enum OPCode
+ {
REGISTER_CONSUMER, REGISTER_PRODUCER,
PRODUCER_STARTWARMUP, CONSUMER_STARTWARMUP,
CONSUMER_READY, PRODUCER_READY,
@@ -79,39 +81,11 @@ public class PerfBase
CONTINUE_TEST, STOP_TEST
};
- enum MessageType {
- BYTES, TEXT, MAP, OBJECT;
-
- public static MessageType getType(String s) throws Exception
- {
- if ("text".equalsIgnoreCase(s))
- {
- return TEXT;
- }
- else if ("bytes".equalsIgnoreCase(s))
- {
- return BYTES;
- }
- /*else if ("map".equalsIgnoreCase(s))
- {
- return MAP;
- }
- else if ("object".equalsIgnoreCase(s))
- {
- return OBJECT;
- }*/
- else
- {
- throw new Exception("Unsupported message type");
- }
- }
- };
-
MessageType msgType = MessageType.BYTES;
- public PerfBase(String prefix)
+ public MercuryBase(TestConfiguration config,String prefix)
{
- params = new TestParams();
+ this.config = config;
String host = "";
try
{
@@ -127,25 +101,16 @@ public class PerfBase
public void setUp() throws Exception
{
- if (params.getHost().equals("") || params.getPort() == -1)
- {
- con = new AMQConnection(params.getUrl());
- }
- else
- {
- con = new AMQConnection(params.getHost(),params.getPort(),"guest","guest","test","test");
- }
+ con = config.createConnection();
con.start();
- session = con.createSession(params.isTransacted(),
- params.isTransacted()? Session.SESSION_TRANSACTED:params.getAckMode());
controllerSession = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
dest = createDestination();
- controllerQueue = new AMQAnyDestination(CONTROLLER_ADDR);
+ controllerQueue = AMQDestination.createDestination(CONTROLLER_ADDR);
myControlQueue = session.createQueue(myControlQueueAddr);
- msgType = MessageType.getType(params.getMessageType());
- System.out.println("Using " + msgType + " messages");
+ msgType = MessageType.getType(config.getMessageType());
+ _logger.debug("Using " + msgType + " messages");
sendToController = controllerSession.createProducer(controllerQueue);
receiveFromController = controllerSession.createConsumer(myControlQueue);
@@ -153,11 +118,11 @@ public class PerfBase
private Destination createDestination() throws Exception
{
- if (params.isUseUniqueDests())
+ if (config.isUseUniqueDests())
{
- System.out.println("Prefix : " + prefix);
- Address addr = Address.parse(params.getAddress());
- AMQAnyDestination temp = new AMQAnyDestination(params.getAddress());
+ _logger.debug("Prefix : " + prefix);
+ Address addr = Address.parse(config.getAddress());
+ AMQDestination temp = (AMQDestination) AMQDestination.createDestination(config.getAddress());
int type = ((AMQSession_0_10)session).resolveAddressType(temp);
if ( type == AMQDestination.TOPIC_TYPE)
@@ -171,11 +136,11 @@ public class PerfBase
System.out.println("Setting name : " + addr);
}
- return new AMQAnyDestination(addr);
+ return AMQDestination.createDestination(addr.toString());
}
else
{
- return new AMQAnyDestination(params.getAddress());
+ return AMQDestination.createDestination(config.getAddress());
}
}
@@ -190,7 +155,7 @@ public class PerfBase
{
MapMessage m = (MapMessage)receiveFromController.receive();
OPCode code = OPCode.values()[m.getInt(CODE)];
- System.out.println("Received Code : " + code);
+ _logger.debug("Received Code : " + code);
if (expected != code)
{
throw new Exception("Expected OPCode : " + expected + " but received : " + code);
@@ -202,7 +167,7 @@ public class PerfBase
{
MapMessage m = (MapMessage)receiveFromController.receive();
OPCode code = OPCode.values()[m.getInt(CODE)];
- System.out.println("Received Code : " + code);
+ _logger.debug("Received Code : " + code);
return (code == OPCode.CONTINUE_TEST);
}
diff --git a/java/tools/src/main/java/org/apache/qpid/tools/MercuryConsumerController.java b/java/tools/src/main/java/org/apache/qpid/tools/MercuryConsumerController.java
new file mode 100644
index 0000000000..b35adc45d6
--- /dev/null
+++ b/java/tools/src/main/java/org/apache/qpid/tools/MercuryConsumerController.java
@@ -0,0 +1,231 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tools;
+
+import java.util.concurrent.CountDownLatch;
+
+import javax.jms.MapMessage;
+
+import org.apache.qpid.thread.Threading;
+import org.apache.qpid.tools.report.MercuryReporter;
+import org.apache.qpid.tools.report.MercuryReporter.MercuryThroughputAndLatency;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * PerfConsumer will receive x no of messages in warmup mode.
+ * Once it receives the Start message it will then signal the PerfProducer.
+ * It will start recording stats from the first message it receives after
+ * the warmup mode is done.
+ *
+ * The following calculations are done.
+ * The important numbers to look at is
+ * a) Avg Latency
+ * b) System throughput.
+ *
+ * Latency.
+ * =========
+ * Currently this test is written with the assumption that either
+ * a) The Perf Producer and Consumer are on the same machine
+ * b) They are on separate machines that have their time synced via a Time Server
+ *
+ * In order to calculate latency the producer inserts a timestamp
+ * when the message is sent. The consumer will note the current time the message is
+ * received and will calculate the latency as follows
+ * latency = rcvdTime - msg.getJMSTimestamp()
+ *
+ * Through out the test it will keep track of the max and min latency to show the
+ * variance in latencies.
+ *
+ * Avg latency is measured by adding all latencies and dividing by the total msgs.
+ *
+ * Throughput
+ * ===========
+ * Consumer rate is calculated as
+ * rcvdMsgCount/(rcvdTime - startTime)
+ *
+ * Note that the testStartTime referes to when the producer sent the first message
+ * and startTime is when the consumer first received a message.
+ *
+ * rcvdTime keeps track of when the last message is received.
+ *
+ * All throughput rates are given as msg/sec so the rates are multiplied by 1000.
+ *
+ */
+
+public class MercuryConsumerController extends MercuryBase
+{
+ private static final Logger _logger = LoggerFactory.getLogger(MercuryConsumerController.class);
+ MercuryReporter reporter;
+ TestConfiguration config;
+ QpidReceive receiver;
+
+ public MercuryConsumerController(TestConfiguration config, MercuryReporter reporter, String prefix)
+ {
+ super(config,prefix);
+ this.reporter = reporter;
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info("Consumer ID : " + id);
+ }
+ }
+
+ public void setUp() throws Exception
+ {
+ super.setUp();
+ receiver = new QpidReceive(reporter,config, con,dest);
+ receiver.setUp();
+ MapMessage m = controllerSession.createMapMessage();
+ m.setInt(CODE, OPCode.REGISTER_CONSUMER.ordinal());
+ sendMessageToController(m);
+ }
+
+ public void warmup()throws Exception
+ {
+ receiveFromController(OPCode.CONSUMER_STARTWARMUP);
+ receiver.waitforCompletion(config.getWarmupCount());
+
+ // It's more realistic for the consumer to signal this.
+ MapMessage m1 = controllerSession.createMapMessage();
+ m1.setInt(CODE, OPCode.PRODUCER_READY.ordinal());
+ sendMessageToController(m1);
+
+ MapMessage m2 = controllerSession.createMapMessage();
+ m2.setInt(CODE, OPCode.CONSUMER_READY.ordinal());
+ sendMessageToController(m2);
+ }
+
+ public void runReceiver() throws Exception
+ {
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info("Consumer: " + id + " Starting iteration......" + "\n");
+ }
+ resetCounters();
+ receiver.waitforCompletion(config.getMsgCount());
+ MapMessage m = controllerSession.createMapMessage();
+ m.setInt(CODE, OPCode.RECEIVED_END_MSG.ordinal());
+ sendMessageToController(m);
+ }
+
+ public void resetCounters()
+ {
+ reporter.clear();
+ }
+
+ public void sendResults() throws Exception
+ {
+ receiveFromController(OPCode.CONSUMER_STOP);
+ reporter.report();
+
+ MapMessage m = controllerSession.createMapMessage();
+ m.setInt(CODE, OPCode.RECEIVED_CONSUMER_STATS.ordinal());
+ m.setDouble(AVG_LATENCY, reporter.getAvgLatency());
+ m.setDouble(MIN_LATENCY, reporter.getMinLatency());
+ m.setDouble(MAX_LATENCY, reporter.getMaxLatency());
+ m.setDouble(STD_DEV, reporter.getStdDev());
+ m.setDouble(CONS_RATE, reporter.getRate());
+ m.setLong(MSG_COUNT, reporter.getSampleSize());
+ sendMessageToController(m);
+
+ reporter.log(new StringBuilder("Total Msgs Received : ").append(reporter.getSampleSize()).toString());
+ reporter.log(new StringBuilder("Consumer rate : ").
+ append(config.getDecimalFormat().format(reporter.getRate())).
+ append(" msg/sec").toString());
+ reporter.log(new StringBuilder("Avg Latency : ").
+ append(config.getDecimalFormat().format(reporter.getAvgLatency())).
+ append(" ms").toString());
+ reporter.log(new StringBuilder("Min Latency : ").
+ append(config.getDecimalFormat().format(reporter.getMinLatency())).
+ append(" ms").toString());
+ reporter.log(new StringBuilder("Max Latency : ").
+ append(config.getDecimalFormat().format(reporter.getMaxLatency())).
+ append(" ms").toString());
+ if (config.isPrintStdDev())
+ {
+ reporter.log(new StringBuilder("Std Dev : ").
+ append(reporter.getStdDev()).toString());
+ }
+ }
+
+ public void run()
+ {
+ try
+ {
+ setUp();
+ warmup();
+ boolean nextIteration = true;
+ while (nextIteration)
+ {
+ System.out.println("=========================================================\n");
+ System.out.println("Consumer: " + id + " starting a new iteration ......\n");
+ runReceiver();
+ sendResults();
+ nextIteration = continueTest();
+ }
+ tearDown();
+ }
+ catch(Exception e)
+ {
+ handleError(e,"Error when running test");
+ }
+ }
+
+ @Override
+ public void tearDown() throws Exception
+ {
+ super.tearDown();
+ }
+
+ public static void main(String[] args) throws Exception
+ {
+ TestConfiguration config = new JVMArgConfiguration();
+ MercuryReporter reporter= new MercuryReporter(MercuryThroughputAndLatency.class,System.out,10,true);
+ String scriptId = (args.length == 1) ? args[0] : "";
+ int conCount = config.getConnectionCount();
+ final CountDownLatch testCompleted = new CountDownLatch(conCount);
+ for (int i=0; i < conCount; i++)
+ {
+ final MercuryConsumerController cons = new MercuryConsumerController(config, reporter, scriptId + i);
+ Runnable r = new Runnable()
+ {
+ public void run()
+ {
+ cons.run();
+ testCompleted.countDown();
+ }
+ };
+
+ Thread t;
+ try
+ {
+ t = Threading.getThreadFactory().createThread(r);
+ }
+ catch(Exception e)
+ {
+ throw new Error("Error creating consumer thread",e);
+ }
+ t.start();
+ }
+ testCompleted.await();
+ reporter.log("Consumers have completed the test......\n");
+ }
+} \ No newline at end of file
diff --git a/java/tools/src/main/java/org/apache/qpid/tools/MercuryProducerController.java b/java/tools/src/main/java/org/apache/qpid/tools/MercuryProducerController.java
new file mode 100644
index 0000000000..02377bb853
--- /dev/null
+++ b/java/tools/src/main/java/org/apache/qpid/tools/MercuryProducerController.java
@@ -0,0 +1,210 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tools;
+
+import java.util.concurrent.CountDownLatch;
+
+import javax.jms.MapMessage;
+
+import org.apache.qpid.thread.Threading;
+import org.apache.qpid.tools.report.MercuryReporter;
+import org.apache.qpid.tools.report.MercuryReporter.MercuryThroughput;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * PerfProducer sends an x no of messages in warmup mode and wait for a confirmation
+ * from the consumer that it has successfully consumed them and ready to start the
+ * test. It will start sending y no of messages and each message will contain a time
+ * stamp. This will be used at the receiving end to measure the latency.
+ *
+ * This is done with the assumption that both consumer and producer are running on
+ * the same machine or different machines which have time synced using a time server.
+ *
+ * This test also calculates the producer rate as follows.
+ * rate = msg_count/(time_before_sending_msgs - time_after_sending_msgs)
+ *
+ * All throughput rates are given as msg/sec so the rates are multiplied by 1000.
+ *
+ * Rajith - Producer rate is not an accurate perf metric IMO.
+ * It is heavily inlfuenced by any in memory buffering.
+ * System throughput and latencies calculated by the PerfConsumer are more realistic
+ * numbers.
+ *
+ * Answer by rajith : I agree about in memory buffering affecting rates. But Based on test runs
+ * I have done so far, it seems quite useful to compute the producer rate as it gives an
+ * indication of how the system behaves. For ex if there is a gap between producer and consumer rates
+ * you could clearly see the higher latencies and when producer and consumer rates are very close,
+ * latency is good.
+ *
+ */
+public class MercuryProducerController extends MercuryBase
+{
+ private static final Logger _logger = LoggerFactory.getLogger(MercuryProducerController.class);
+ MercuryReporter reporter;
+ QpidSend sender;
+
+ public MercuryProducerController(TestConfiguration config, MercuryReporter reporter, String prefix)
+ {
+ super(config,prefix);
+ this.reporter = reporter;
+ System.out.println("Producer ID : " + id);
+ }
+
+ public void setUp() throws Exception
+ {
+ super.setUp();
+ sender = new QpidSend(reporter,config, con,dest);
+ sender.setUp();
+ MapMessage m = controllerSession.createMapMessage();
+ m.setInt(CODE, OPCode.REGISTER_PRODUCER.ordinal());
+ sendMessageToController(m);
+ }
+
+ public void warmup()throws Exception
+ {
+ receiveFromController(OPCode.PRODUCER_STARTWARMUP);
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info("Producer: " + id + " Warming up......");
+ }
+ sender.send(config.getWarmupCount());
+ sender.sendEndMessage();
+ }
+
+ public void runSender() throws Exception
+ {
+ resetCounters();
+ receiveFromController(OPCode.PRODUCER_START);
+ sender.send(config.getMsgCount());
+ }
+
+ public void resetCounters()
+ {
+ sender.resetCounters();
+ }
+
+ public void sendResults() throws Exception
+ {
+ MapMessage msg = controllerSession.createMapMessage();
+ msg.setInt(CODE, OPCode.RECEIVED_PRODUCER_STATS.ordinal());
+ msg.setDouble(PROD_RATE, reporter.getRate());
+ sendMessageToController(msg);
+ reporter.log(new StringBuilder("Producer rate: ").
+ append(config.getDecimalFormat().format(reporter.getRate())).
+ append(" msg/sec").
+ toString());
+ }
+
+ @Override
+ public void tearDown() throws Exception
+ {
+ sender.tearDown();
+ super.tearDown();
+ }
+
+ public void run()
+ {
+ try
+ {
+ setUp();
+ warmup();
+ boolean nextIteration = true;
+ while (nextIteration)
+ {
+ if(_logger.isInfoEnabled())
+ {
+ _logger.info("=========================================================\n");
+ _logger.info("Producer: " + id + " starting a new iteration ......\n");
+ }
+ runSender();
+ sendResults();
+ nextIteration = continueTest();
+ }
+ tearDown();
+ }
+ catch(Exception e)
+ {
+ handleError(e,"Error when running test");
+ }
+ }
+
+ public void startControllerIfNeeded()
+ {
+ if (!config.isExternalController())
+ {
+ final MercuryTestController controller = new MercuryTestController(config);
+ Runnable r = new Runnable()
+ {
+ public void run()
+ {
+ controller.run();
+ }
+ };
+
+ Thread t;
+ try
+ {
+ t = Threading.getThreadFactory().createThread(r);
+ }
+ catch(Exception e)
+ {
+ throw new Error("Error creating controller thread",e);
+ }
+ t.start();
+ }
+ }
+
+ public static void main(String[] args) throws Exception
+ {
+ TestConfiguration config = new JVMArgConfiguration();
+ MercuryReporter reporter= new MercuryReporter(MercuryThroughput.class,System.out,10,true);
+ String scriptId = (args.length == 1) ? args[0] : "";
+ int conCount = config.getConnectionCount();
+ final CountDownLatch testCompleted = new CountDownLatch(conCount);
+ for (int i=0; i < conCount; i++)
+ {
+ final MercuryProducerController prod = new MercuryProducerController(config, reporter, scriptId + i);
+ prod.startControllerIfNeeded();
+ Runnable r = new Runnable()
+ {
+ public void run()
+ {
+ prod.run();
+ testCompleted.countDown();
+ }
+ };
+
+ Thread t;
+ try
+ {
+ t = Threading.getThreadFactory().createThread(r);
+ }
+ catch(Exception e)
+ {
+ throw new Error("Error creating producer thread",e);
+ }
+ t.start();
+ }
+ testCompleted.await();
+ reporter.log("Producers have completed the test......");
+ }
+} \ No newline at end of file
diff --git a/java/tools/src/main/java/org/apache/qpid/tools/PerfTestController.java b/java/tools/src/main/java/org/apache/qpid/tools/MercuryTestController.java
index 5fca1fa4bd..8c66a1e44d 100644
--- a/java/tools/src/main/java/org/apache/qpid/tools/PerfTestController.java
+++ b/java/tools/src/main/java/org/apache/qpid/tools/MercuryTestController.java
@@ -33,6 +33,9 @@ import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import org.apache.qpid.client.message.AMQPEncodedMapMessage;
+import org.apache.qpid.tools.report.Reporter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* The Controller coordinates a test run between a number
@@ -62,8 +65,10 @@ import org.apache.qpid.client.message.AMQPEncodedMapMessage;
* System throughput is calculated as follows
* totalMsgCount/(totalTestTime)
*/
-public class PerfTestController extends PerfBase implements MessageListener
+public class MercuryTestController extends MercuryBase implements MessageListener
{
+ private static final Logger _logger = LoggerFactory.getLogger(MercuryProducerController.class);
+
enum TestMode { SINGLE_RUN, TIME_BASED };
TestMode testMode = TestMode.SINGLE_RUN;
@@ -102,11 +107,13 @@ public class PerfTestController extends PerfBase implements MessageListener
private MessageConsumer consumer;
private boolean printStdDev = false;
- FileWriter writer;
+ private FileWriter writer;
+ private Reporter report;
- public PerfTestController()
+ public MercuryTestController(TestConfiguration config)
{
- super("");
+ super(config,"");
+
consumers = new ConcurrentHashMap<String,MapMessage>(consumerCount);
producers = new ConcurrentHashMap<String,MapMessage>(producerCount);
@@ -114,7 +121,7 @@ public class PerfTestController extends PerfBase implements MessageListener
prodRegistered = new CountDownLatch(producerCount);
consReady = new CountDownLatch(consumerCount);
prodReady = new CountDownLatch(producerCount);
- printStdDev = params.isPrintStdDev();
+ printStdDev = config.isPrintStdDev();
testMode = (duration == -1) ? TestMode.SINGLE_RUN : TestMode.TIME_BASED;
}
@@ -126,28 +133,28 @@ public class PerfTestController extends PerfBase implements MessageListener
writer = new FileWriter("stats-csv.log");
}
consumer = controllerSession.createConsumer(controllerQueue);
- System.out.println("\nController: " + producerCount + " producers are expected");
- System.out.println("Controller: " + consumerCount + " consumers are expected \n");
+ report.log("\nController: " + producerCount + " producers are expected");
+ report.log("Controller: " + consumerCount + " consumers are expected \n");
consumer.setMessageListener(this);
consRegistered.await();
prodRegistered.await();
- System.out.println("\nController: All producers and consumers have registered......\n");
+ report.log("\nController: All producers and consumers have registered......\n");
}
public void warmup() throws Exception
{
- System.out.println("Controller initiating warm up sequence......");
+ report.log("Controller initiating warm up sequence......");
sendMessageToNodes(OPCode.CONSUMER_STARTWARMUP,consumers.values());
sendMessageToNodes(OPCode.PRODUCER_STARTWARMUP,producers.values());
prodReady.await();
consReady.await();
- System.out.println("\nController : All producers and consumers are ready to start the test......\n");
+ report.log("\nController : All producers and consumers are ready to start the test......\n");
}
public void startTest() throws Exception
{
resetCounters();
- System.out.println("\nController Starting test......");
+ report.log("\nController Starting test......");
long start = Clock.getTime();
sendMessageToNodes(OPCode.PRODUCER_START,producers.values());
receivedEndMsg.await();
@@ -200,7 +207,7 @@ public class PerfTestController extends PerfBase implements MessageListener
}
catch(Exception e)
{
- System.out.println("Error calculating stats from Consumer : " + conStat);
+ System.err.println("Error calculating stats from Consumer : " + conStat);
}
@@ -217,7 +224,7 @@ public class PerfTestController extends PerfBase implements MessageListener
}
catch(Exception e)
{
- System.out.println("Error calculating stats from Producer : " + conStat);
+ System.err.println("Error calculating stats from Producer : " + conStat);
}
avgSystemLatency = totLatency/consumers.size();
@@ -225,56 +232,56 @@ public class PerfTestController extends PerfBase implements MessageListener
avgSystemConsRate = totalConsRate/consumers.size();
avgSystemProdRate = totalProdRate/producers.size();
- System.out.println("Total test time : " + totalTestTime + " in " + Clock.getPrecision());
+ report.log("Total test time : " + totalTestTime + " in " + Clock.getPrecision());
totalSystemThroughput = (totalMsgCount*Clock.convertToSecs()/totalTestTime);
}
public void printResults() throws Exception
{
- System.out.println(new StringBuilder("Total Msgs Received : ").append(totalMsgCount).toString());
- System.out.println(new StringBuilder("System Throughput : ").
- append(df.format(totalSystemThroughput)).
- append(" msg/sec").toString());
- System.out.println(new StringBuilder("Avg Consumer rate : ").
- append(df.format(avgSystemConsRate)).
- append(" msg/sec").toString());
- System.out.println(new StringBuilder("Min Consumer rate : ").
- append(df.format(minSystemConsRate)).
- append(" msg/sec").toString());
- System.out.println(new StringBuilder("Max Consumer rate : ").
- append(df.format(maxSystemConsRate)).
- append(" msg/sec").toString());
-
- System.out.println(new StringBuilder("Avg Producer rate : ").
- append(df.format(avgSystemProdRate)).
- append(" msg/sec").toString());
- System.out.println(new StringBuilder("Min Producer rate : ").
- append(df.format(minSystemProdRate)).
- append(" msg/sec").toString());
- System.out.println(new StringBuilder("Max Producer rate : ").
- append(df.format(maxSystemProdRate)).
- append(" msg/sec").toString());
-
- System.out.println(new StringBuilder("Avg System Latency : ").
- append(df.format(avgSystemLatency)).
- append(" ms").toString());
- System.out.println(new StringBuilder("Min System Latency : ").
- append(df.format(minSystemLatency)).
- append(" ms").toString());
- System.out.println(new StringBuilder("Max System Latency : ").
- append(df.format(maxSystemLatency)).
- append(" ms").toString());
+ report.log(new StringBuilder("Total Msgs Received : ").append(totalMsgCount).toString());
+ report.log(new StringBuilder("System Throughput : ").
+ append(config.getDecimalFormat().format(totalSystemThroughput)).
+ append(" msg/sec").toString());
+ report.log(new StringBuilder("Avg Consumer rate : ").
+ append(config.getDecimalFormat().format(avgSystemConsRate)).
+ append(" msg/sec").toString());
+ report.log(new StringBuilder("Min Consumer rate : ").
+ append(config.getDecimalFormat().format(minSystemConsRate)).
+ append(" msg/sec").toString());
+ report.log(new StringBuilder("Max Consumer rate : ").
+ append(config.getDecimalFormat().format(maxSystemConsRate)).
+ append(" msg/sec").toString());
+
+ report.log(new StringBuilder("Avg Producer rate : ").
+ append(config.getDecimalFormat().format(avgSystemProdRate)).
+ append(" msg/sec").toString());
+ report.log(new StringBuilder("Min Producer rate : ").
+ append(config.getDecimalFormat().format(minSystemProdRate)).
+ append(" msg/sec").toString());
+ report.log(new StringBuilder("Max Producer rate : ").
+ append(config.getDecimalFormat().format(maxSystemProdRate)).
+ append(" msg/sec").toString());
+
+ report.log(new StringBuilder("Avg System Latency : ").
+ append(config.getDecimalFormat().format(avgSystemLatency)).
+ append(" ms").toString());
+ report.log(new StringBuilder("Min System Latency : ").
+ append(config.getDecimalFormat().format(minSystemLatency)).
+ append(" ms").toString());
+ report.log(new StringBuilder("Max System Latency : ").
+ append(config.getDecimalFormat().format(maxSystemLatency)).
+ append(" ms").toString());
if (printStdDev)
{
- System.out.println(new StringBuilder("Avg System Std Dev : ").
- append(avgSystemLatencyStdDev));
+ report.log(new StringBuilder("Avg System Std Dev : ").
+ append(avgSystemLatencyStdDev).toString());
}
}
private synchronized void sendMessageToNodes(OPCode code,Collection<MapMessage> nodes) throws Exception
{
- System.out.println("\nController: Sending code " + code);
+ report.log("\nController: Sending code " + code);
MessageProducer tmpProd = controllerSession.createProducer(null);
MapMessage msg = controllerSession.createMapMessage();
msg.setInt(CODE, code.ordinal());
@@ -282,11 +289,11 @@ public class PerfTestController extends PerfBase implements MessageListener
{
if (node.getString(REPLY_ADDR) == null)
{
- System.out.println("REPLY_ADDR is null " + node);
+ report.log("REPLY_ADDR is null " + node);
}
else
{
- System.out.println("Controller: Sending " + code + " to " + node.getString(REPLY_ADDR));
+ report.log("Controller: Sending " + code + " to " + node.getString(REPLY_ADDR));
}
tmpProd.send(controllerSession.createQueue(node.getString(REPLY_ADDR)), msg);
}
@@ -299,16 +306,16 @@ public class PerfTestController extends PerfBase implements MessageListener
MapMessage m = (MapMessage)msg;
OPCode code = OPCode.values()[m.getInt(CODE)];
- System.out.println("\n---------Controller Received Code : " + code);
- System.out.println("---------Data : " + ((AMQPEncodedMapMessage)m).getMap());
+ report.log("\n---------Controller Received Code : " + code);
+ report.log("---------Data : " + ((AMQPEncodedMapMessage)m).getMap());
switch (code)
{
case REGISTER_CONSUMER :
if (consRegistered.getCount() == 0)
{
- System.out.println("Warning : Expected number of consumers have already registered," +
- "ignoring extra consumer");
+ report.log("Warning : Expected number of consumers have already registered," +
+ "ignoring extra consumer");
break;
}
consumers.put(m.getString(ID),m);
@@ -318,8 +325,8 @@ public class PerfTestController extends PerfBase implements MessageListener
case REGISTER_PRODUCER :
if (prodRegistered.getCount() == 0)
{
- System.out.println("Warning : Expected number of producers have already registered," +
- "ignoring extra producer");
+ report.log("Warning : Expected number of producers have already registered," +
+ "ignoring extra producer");
break;
}
producers.put(m.getString(ID),m);
@@ -403,7 +410,7 @@ public class PerfTestController extends PerfBase implements MessageListener
@Override
public void tearDown() throws Exception {
- System.out.println("Controller: Completed the test......\n");
+ report.log("Controller: Completed the test......\n");
if (testMode == TestMode.TIME_BASED)
{
writer.close();
@@ -416,16 +423,16 @@ public class PerfTestController extends PerfBase implements MessageListener
public void writeStatsToFile() throws Exception
{
writer.append(String.valueOf(totalMsgCount)).append(",");
- writer.append(df.format(totalSystemThroughput)).append(",");
- writer.append(df.format(avgSystemConsRate)).append(",");
- writer.append(df.format(minSystemConsRate)).append(",");
- writer.append(df.format(maxSystemConsRate)).append(",");
- writer.append(df.format(avgSystemProdRate)).append(",");
- writer.append(df.format(minSystemProdRate)).append(",");
- writer.append(df.format(maxSystemProdRate)).append(",");
- writer.append(df.format(avgSystemLatency)).append(",");
- writer.append(df.format(minSystemLatency)).append(",");
- writer.append(df.format(maxSystemLatency));
+ writer.append(config.getDecimalFormat().format(totalSystemThroughput)).append(",");
+ writer.append(config.getDecimalFormat().format(avgSystemConsRate)).append(",");
+ writer.append(config.getDecimalFormat().format(minSystemConsRate)).append(",");
+ writer.append(config.getDecimalFormat().format(maxSystemConsRate)).append(",");
+ writer.append(config.getDecimalFormat().format(avgSystemProdRate)).append(",");
+ writer.append(config.getDecimalFormat().format(minSystemProdRate)).append(",");
+ writer.append(config.getDecimalFormat().format(maxSystemProdRate)).append(",");
+ writer.append(config.getDecimalFormat().format(avgSystemLatency)).append(",");
+ writer.append(config.getDecimalFormat().format(minSystemLatency)).append(",");
+ writer.append(config.getDecimalFormat().format(maxSystemLatency));
if (printStdDev)
{
writer.append(",").append(String.valueOf(avgSystemLatencyStdDev));
@@ -436,7 +443,8 @@ public class PerfTestController extends PerfBase implements MessageListener
public static void main(String[] args)
{
- PerfTestController controller = new PerfTestController();
+ TestConfiguration config = new JVMArgConfiguration();
+ MercuryTestController controller = new MercuryTestController(config);
controller.run();
}
}
diff --git a/java/tools/src/main/java/org/apache/qpid/tools/PerfConsumer.java b/java/tools/src/main/java/org/apache/qpid/tools/PerfConsumer.java
deleted file mode 100644
index b63892bb51..0000000000
--- a/java/tools/src/main/java/org/apache/qpid/tools/PerfConsumer.java
+++ /dev/null
@@ -1,325 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.tools;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-
-import javax.jms.MapMessage;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.TextMessage;
-
-import org.apache.qpid.client.AMQDestination;
-import org.apache.qpid.thread.Threading;
-
-/**
- * PerfConsumer will receive x no of messages in warmup mode.
- * Once it receives the Start message it will then signal the PerfProducer.
- * It will start recording stats from the first message it receives after
- * the warmup mode is done.
- *
- * The following calculations are done.
- * The important numbers to look at is
- * a) Avg Latency
- * b) System throughput.
- *
- * Latency.
- * =========
- * Currently this test is written with the assumption that either
- * a) The Perf Producer and Consumer are on the same machine
- * b) They are on separate machines that have their time synced via a Time Server
- *
- * In order to calculate latency the producer inserts a timestamp
- * when the message is sent. The consumer will note the current time the message is
- * received and will calculate the latency as follows
- * latency = rcvdTime - msg.getJMSTimestamp()
- *
- * Through out the test it will keep track of the max and min latency to show the
- * variance in latencies.
- *
- * Avg latency is measured by adding all latencies and dividing by the total msgs.
- *
- * Throughput
- * ===========
- * Consumer rate is calculated as
- * rcvdMsgCount/(rcvdTime - startTime)
- *
- * Note that the testStartTime referes to when the producer sent the first message
- * and startTime is when the consumer first received a message.
- *
- * rcvdTime keeps track of when the last message is received.
- *
- * All throughput rates are given as msg/sec so the rates are multiplied by 1000.
- *
- */
-
-public class PerfConsumer extends PerfBase implements MessageListener
-{
- MessageConsumer consumer;
- long maxLatency = 0;
- long minLatency = Long.MAX_VALUE;
- long totalLatency = 0; // to calculate avg latency.
- int rcvdMsgCount = 0;
- long startTime = 0; // to measure consumer throughput
- long rcvdTime = 0;
- boolean transacted = false;
- int transSize = 0;
-
- boolean printStdDev = false;
- List<Long> sample;
-
- final Object lock = new Object();
-
- public PerfConsumer(String prefix)
- {
- super(prefix);
- System.out.println("Consumer ID : " + id);
- }
-
- public void setUp() throws Exception
- {
- super.setUp();
- consumer = session.createConsumer(dest);
- System.out.println("Consumer: " + id + " Receiving messages from : " + ((AMQDestination)dest).getQueueName() + "\n");
-
- // Storing the following two for efficiency
- transacted = params.isTransacted();
- transSize = params.getTransactionSize();
- printStdDev = params.isPrintStdDev();
- MapMessage m = controllerSession.createMapMessage();
- m.setInt(CODE, OPCode.REGISTER_CONSUMER.ordinal());
- sendMessageToController(m);
- }
-
- public void warmup()throws Exception
- {
- receiveFromController(OPCode.CONSUMER_STARTWARMUP);
- Message msg = consumer.receive();
- // This is to ensure we drain the queue before we start the actual test.
- while ( msg != null)
- {
- if (msg.getBooleanProperty("End") == true)
- {
- // It's more realistic for the consumer to signal this.
- MapMessage m = controllerSession.createMapMessage();
- m.setInt(CODE, OPCode.PRODUCER_READY.ordinal());
- sendMessageToController(m);
- }
- msg = consumer.receive(1000);
- }
-
- if (params.isTransacted())
- {
- session.commit();
- }
-
- MapMessage m = controllerSession.createMapMessage();
- m.setInt(CODE, OPCode.CONSUMER_READY.ordinal());
- sendMessageToController(m);
- consumer.setMessageListener(this);
- }
-
- public void startTest() throws Exception
- {
- System.out.println("Consumer: " + id + " Starting test......" + "\n");
- resetCounters();
- }
-
- public void resetCounters()
- {
- rcvdMsgCount = 0;
- maxLatency = 0;
- minLatency = Long.MAX_VALUE;
- totalLatency = 0;
- if (printStdDev)
- {
- sample = null;
- sample = new ArrayList<Long>(params.getMsgCount());
- }
- }
-
- public void sendResults() throws Exception
- {
- receiveFromController(OPCode.CONSUMER_STOP);
-
- double avgLatency = (double)totalLatency/(double)rcvdMsgCount;
- double consRate = (double)rcvdMsgCount*Clock.convertToSecs()/(double)(rcvdTime - startTime);
- double stdDev = 0.0;
- if (printStdDev)
- {
- stdDev = calculateStdDev(avgLatency);
- }
- MapMessage m = controllerSession.createMapMessage();
- m.setInt(CODE, OPCode.RECEIVED_CONSUMER_STATS.ordinal());
- m.setDouble(AVG_LATENCY, avgLatency/Clock.convertToMiliSecs());
- m.setDouble(MIN_LATENCY,minLatency/Clock.convertToMiliSecs());
- m.setDouble(MAX_LATENCY,maxLatency/Clock.convertToMiliSecs());
- m.setDouble(STD_DEV, stdDev/Clock.convertToMiliSecs());
- m.setDouble(CONS_RATE, consRate);
- m.setLong(MSG_COUNT, rcvdMsgCount);
- sendMessageToController(m);
-
- System.out.println(new StringBuilder("Total Msgs Received : ").append(rcvdMsgCount).toString());
- System.out.println(new StringBuilder("Consumer rate : ").
- append(df.format(consRate)).
- append(" msg/sec").toString());
- System.out.println(new StringBuilder("Avg Latency : ").
- append(df.format(avgLatency/Clock.convertToMiliSecs())).
- append(" ms").toString());
- System.out.println(new StringBuilder("Min Latency : ").
- append(df.format(minLatency/Clock.convertToMiliSecs())).
- append(" ms").toString());
- System.out.println(new StringBuilder("Max Latency : ").
- append(df.format(maxLatency/Clock.convertToMiliSecs())).
- append(" ms").toString());
- if (printStdDev)
- {
- System.out.println(new StringBuilder("Std Dev : ").
- append(stdDev/Clock.convertToMiliSecs()).toString());
- }
- }
-
- public double calculateStdDev(double mean)
- {
- double v = 0;
- for (double latency: sample)
- {
- v = v + Math.pow((latency-mean), 2);
- }
- v = v/sample.size();
- return Math.round(Math.sqrt(v));
- }
-
- public void onMessage(Message msg)
- {
- try
- {
- // To figure out the decoding overhead of text
- if (msgType == MessageType.TEXT)
- {
- ((TextMessage)msg).getText();
- }
-
- if (msg.getBooleanProperty("End"))
- {
- MapMessage m = controllerSession.createMapMessage();
- m.setInt(CODE, OPCode.RECEIVED_END_MSG.ordinal());
- sendMessageToController(m);
- }
- else
- {
- rcvdTime = Clock.getTime();
- rcvdMsgCount ++;
-
- if (rcvdMsgCount == 1)
- {
- startTime = rcvdTime;
- }
-
- if (transacted && (rcvdMsgCount % transSize == 0))
- {
- session.commit();
- }
-
- long latency = rcvdTime - msg.getLongProperty(TIMESTAMP);
- maxLatency = Math.max(maxLatency, latency);
- minLatency = Math.min(minLatency, latency);
- totalLatency = totalLatency + latency;
- if (printStdDev)
- {
- sample.add(latency);
- }
- }
-
- }
- catch(Exception e)
- {
- handleError(e,"Error when receiving messages");
- }
-
- }
-
- public void run()
- {
- try
- {
- setUp();
- warmup();
- boolean nextIteration = true;
- while (nextIteration)
- {
- System.out.println("=========================================================\n");
- System.out.println("Consumer: " + id + " starting a new iteration ......\n");
- startTest();
- sendResults();
- nextIteration = continueTest();
- }
- tearDown();
- }
- catch(Exception e)
- {
- handleError(e,"Error when running test");
- }
- }
-
- @Override
- public void tearDown() throws Exception
- {
- super.tearDown();
- }
-
- public static void main(String[] args) throws InterruptedException
- {
- String scriptId = (args.length == 1) ? args[0] : "";
- int conCount = Integer.getInteger("con_count",1);
- final CountDownLatch testCompleted = new CountDownLatch(conCount);
- for (int i=0; i < conCount; i++)
- {
-
- final PerfConsumer cons = new PerfConsumer(scriptId + i);
- Runnable r = new Runnable()
- {
- public void run()
- {
- cons.run();
- testCompleted.countDown();
- }
- };
-
- Thread t;
- try
- {
- t = Threading.getThreadFactory().createThread(r);
- }
- catch(Exception e)
- {
- throw new Error("Error creating consumer thread",e);
- }
- t.start();
-
- }
- testCompleted.await();
- System.out.println("Consumers have completed the test......\n");
- }
-} \ No newline at end of file
diff --git a/java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java b/java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java
deleted file mode 100644
index ac6129ab68..0000000000
--- a/java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java
+++ /dev/null
@@ -1,358 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.tools;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Random;
-import java.util.concurrent.CountDownLatch;
-
-import javax.jms.BytesMessage;
-import javax.jms.DeliveryMode;
-import javax.jms.MapMessage;
-import javax.jms.Message;
-import javax.jms.MessageProducer;
-
-import org.apache.qpid.client.AMQDestination;
-import org.apache.qpid.thread.Threading;
-
-/**
- * PerfProducer sends an x no of messages in warmup mode and wait for a confirmation
- * from the consumer that it has successfully consumed them and ready to start the
- * test. It will start sending y no of messages and each message will contain a time
- * stamp. This will be used at the receiving end to measure the latency.
- *
- * This is done with the assumption that both consumer and producer are running on
- * the same machine or different machines which have time synced using a time server.
- *
- * This test also calculates the producer rate as follows.
- * rate = msg_count/(time_before_sending_msgs - time_after_sending_msgs)
- *
- * All throughput rates are given as msg/sec so the rates are multiplied by 1000.
- *
- * Rajith - Producer rate is not an accurate perf metric IMO.
- * It is heavily inlfuenced by any in memory buffering.
- * System throughput and latencies calculated by the PerfConsumer are more realistic
- * numbers.
- *
- * Answer by rajith : I agree about in memory buffering affecting rates. But Based on test runs
- * I have done so far, it seems quite useful to compute the producer rate as it gives an
- * indication of how the system behaves. For ex if there is a gap between producer and consumer rates
- * you could clearly see the higher latencies and when producer and consumer rates are very close,
- * latency is good.
- *
- */
-public class PerfProducer extends PerfBase
-{
- private static long SEC = 60000;
-
- MessageProducer producer;
- Message msg;
- Object payload;
- List<Object> payloads;
- boolean cacheMsg = false;
- boolean randomMsgSize = false;
- boolean durable = false;
- Random random;
- int msgSizeRange = 1024;
- boolean rateLimitProducer = false;
- double rateFactor = 0.4;
- double rate = 0.0;
-
- public PerfProducer(String prefix)
- {
- super(prefix);
- System.out.println("Producer ID : " + id);
- }
-
- public void setUp() throws Exception
- {
- super.setUp();
- durable = params.isDurable();
- rateLimitProducer = params.getRate() > 0 ? true : false;
- if (rateLimitProducer)
- {
- System.out.println("The test will attempt to limit the producer to " + params.getRate() + " msg/sec");
- }
-
- // if message caching is enabled we pre create the message
- // else we pre create the payload
- if (params.isCacheMessage())
- {
- cacheMsg = true;
- msg = createMessage(createPayload(params.getMsgSize()));
- msg.setJMSDeliveryMode(durable?
- DeliveryMode.PERSISTENT :
- DeliveryMode.NON_PERSISTENT
- );
- }
- else if (params.isRandomMsgSize())
- {
- random = new Random(20080921);
- randomMsgSize = true;
- msgSizeRange = params.getMsgSize();
- payloads = new ArrayList<Object>(msgSizeRange);
-
- for (int i=0; i < msgSizeRange; i++)
- {
- payloads.add(createPayload(i));
- }
- }
- else
- {
- payload = createPayload(params.getMsgSize());
- }
-
- producer = session.createProducer(dest);
- System.out.println("Producer: " + id + " Sending messages to: " + ((AMQDestination)dest).getQueueName());
- producer.setDisableMessageID(params.isDisableMessageID());
- producer.setDisableMessageTimestamp(params.isDisableTimestamp());
-
- MapMessage m = controllerSession.createMapMessage();
- m.setInt(CODE, OPCode.REGISTER_PRODUCER.ordinal());
- sendMessageToController(m);
- }
-
- Object createPayload(int size)
- {
- if (msgType == MessageType.TEXT)
- {
- return MessageFactory.createMessagePayload(size);
- }
- else
- {
- return MessageFactory.createMessagePayload(size).getBytes();
- }
- }
-
- Message createMessage(Object payload) throws Exception
- {
- if (msgType == MessageType.TEXT)
- {
- return session.createTextMessage((String)payload);
- }
- else
- {
- BytesMessage m = session.createBytesMessage();
- m.writeBytes((byte[])payload);
- return m;
- }
- }
-
- protected Message getNextMessage() throws Exception
- {
- if (cacheMsg)
- {
- return msg;
- }
- else
- {
- Message m;
-
- if (!randomMsgSize)
- {
- m = createMessage(payload);
- }
- else
- {
- m = createMessage(payloads.get(random.nextInt(msgSizeRange)));
- }
- m.setJMSDeliveryMode(durable?
- DeliveryMode.PERSISTENT :
- DeliveryMode.NON_PERSISTENT
- );
- return m;
- }
- }
-
- public void warmup()throws Exception
- {
- receiveFromController(OPCode.PRODUCER_STARTWARMUP);
- System.out.println("Producer: " + id + " Warming up......");
-
- for (int i=0; i < params.getWarmupCount() -1; i++)
- {
- producer.send(getNextMessage());
- }
- sendEndMessage();
-
- if (params.isTransacted())
- {
- session.commit();
- }
- }
-
- public void startTest() throws Exception
- {
- resetCounters();
- receiveFromController(OPCode.PRODUCER_START);
- int count = params.getMsgCount();
- boolean transacted = params.isTransacted();
- int tranSize = params.getTransactionSize();
-
- long limit = (long)(params.getRate() * rateFactor); // in msecs
- long timeLimit = (long)(SEC * rateFactor); // in msecs
-
- long start = Clock.getTime(); // defaults to nano secs
- long interval = start;
- for(int i=0; i < count; i++ )
- {
- Message msg = getNextMessage();
- msg.setLongProperty(TIMESTAMP, Clock.getTime());
- producer.send(msg);
- if ( transacted && ((i+1) % tranSize == 0))
- {
- session.commit();
- }
-
- if (rateLimitProducer && i%limit == 0)
- {
- long elapsed = (Clock.getTime() - interval)*Clock.convertToMiliSecs(); // in msecs
- if (elapsed < timeLimit)
- {
- Thread.sleep(elapsed);
- }
- interval = Clock.getTime();
-
- }
- }
- sendEndMessage();
- if ( transacted)
- {
- session.commit();
- }
- long time = Clock.getTime() - start;
- rate = (double)count*Clock.convertToSecs()/(double)time;
- System.out.println(new StringBuilder("Producer rate: ").
- append(df.format(rate)).
- append(" msg/sec").
- toString());
- }
-
- public void resetCounters()
- {
-
- }
-
- public void sendEndMessage() throws Exception
- {
- Message msg = session.createMessage();
- msg.setBooleanProperty("End", true);
- producer.send(msg);
- }
-
- public void sendResults() throws Exception
- {
- MapMessage msg = controllerSession.createMapMessage();
- msg.setInt(CODE, OPCode.RECEIVED_PRODUCER_STATS.ordinal());
- msg.setDouble(PROD_RATE, rate);
- sendMessageToController(msg);
- }
-
- @Override
- public void tearDown() throws Exception
- {
- super.tearDown();
- }
-
- public void run()
- {
- try
- {
- setUp();
- warmup();
- boolean nextIteration = true;
- while (nextIteration)
- {
- System.out.println("=========================================================\n");
- System.out.println("Producer: " + id + " starting a new iteration ......\n");
- startTest();
- sendResults();
- nextIteration = continueTest();
- }
- tearDown();
- }
- catch(Exception e)
- {
- handleError(e,"Error when running test");
- }
- }
-
- public void startControllerIfNeeded()
- {
- if (!params.isExternalController())
- {
- final PerfTestController controller = new PerfTestController();
- Runnable r = new Runnable()
- {
- public void run()
- {
- controller.run();
- }
- };
-
- Thread t;
- try
- {
- t = Threading.getThreadFactory().createThread(r);
- }
- catch(Exception e)
- {
- throw new Error("Error creating controller thread",e);
- }
- t.start();
- }
- }
-
-
- public static void main(String[] args) throws InterruptedException
- {
- String scriptId = (args.length == 1) ? args[0] : "";
- int conCount = Integer.getInteger("con_count",1);
- final CountDownLatch testCompleted = new CountDownLatch(conCount);
- for (int i=0; i < conCount; i++)
- {
- final PerfProducer prod = new PerfProducer(scriptId + i);
- prod.startControllerIfNeeded();
- Runnable r = new Runnable()
- {
- public void run()
- {
- prod.run();
- testCompleted.countDown();
- }
- };
-
- Thread t;
- try
- {
- t = Threading.getThreadFactory().createThread(r);
- }
- catch(Exception e)
- {
- throw new Error("Error creating producer thread",e);
- }
- t.start();
- }
- testCompleted.await();
- System.out.println("Producers have completed the test......");
- }
-} \ No newline at end of file
diff --git a/java/tools/src/main/java/org/apache/qpid/tools/QpidReceive.java b/java/tools/src/main/java/org/apache/qpid/tools/QpidReceive.java
new file mode 100644
index 0000000000..02f011f1b9
--- /dev/null
+++ b/java/tools/src/main/java/org/apache/qpid/tools/QpidReceive.java
@@ -0,0 +1,181 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tools;
+
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.tools.TestConfiguration.MessageType;
+import org.apache.qpid.tools.report.BasicReporter;
+import org.apache.qpid.tools.report.Reporter;
+import org.apache.qpid.tools.report.Statistics.Throughput;
+import org.apache.qpid.tools.report.Statistics.ThroughputAndLatency;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class QpidReceive implements MessageListener
+{
+ private static final Logger _logger = LoggerFactory.getLogger(QpidSend.class);
+ private final CountDownLatch testCompleted = new CountDownLatch(1);
+
+ private Connection con;
+ private Session session;
+ private Destination dest;
+ private MessageConsumer consumer;
+ private boolean transacted = false;
+ private boolean isRollback = false;
+ private int txSize = 0;
+ private int rollbackFrequency = 0;
+ private int ackFrequency = 0;
+ private int expected = 0;
+ private int received = 0;
+ private Reporter report;
+ private TestConfiguration config;
+
+ public QpidReceive(Reporter report, TestConfiguration config, Connection con, Destination dest)
+ {
+ this(report,config, con, dest, UUID.randomUUID().toString());
+ }
+
+ public QpidReceive(Reporter report, TestConfiguration config, Connection con, Destination dest, String prefix)
+ {
+ //System.out.println("Producer ID : " + id);
+ this.report = report;
+ this.config = config;
+ this.con = con;
+ this.dest = dest;
+ }
+
+ public void setUp() throws Exception
+ {
+ if (config.isTransacted())
+ {
+ session = con.createSession(true, Session.SESSION_TRANSACTED);
+ }
+ else if (config.getAckFrequency() > 0)
+ {
+ session = con.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
+ }
+ else
+ {
+ session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ }
+ consumer = session.createConsumer(dest);
+ consumer.setMessageListener(this);
+ System.out.println("Consumer: " + /*id +*/ " Receiving messages from : " + ((AMQDestination)dest).getAddressName() + "\n");
+
+ transacted = config.isTransacted();
+ txSize = config.getTransactionSize();
+ isRollback = config.getRollbackFrequency() > 0;
+ rollbackFrequency = config.getRollbackFrequency();
+ ackFrequency = config.getAckFrequency();
+ }
+
+ public void resetCounters()
+ {
+ received = 0;
+ expected = 0;
+ report.clear();
+ }
+
+ public void onMessage(Message msg)
+ {
+ try
+ {
+ if (msg instanceof TextMessage &&
+ TestConfiguration.EOS.equals(((TextMessage)msg).getText()))
+ {
+ testCompleted.countDown();
+ return;
+ }
+
+ received++;
+ report.message(msg);
+
+ if (transacted && (received % txSize == 0))
+ {
+ if (isRollback && (received % rollbackFrequency == 0))
+ {
+ session.rollback();
+ }
+ else
+ {
+ session.commit();
+ }
+ }
+ else if (ackFrequency > 0)
+ {
+ msg.acknowledge();
+ }
+
+ if (expected >= received)
+ {
+ testCompleted.countDown();
+ }
+
+ }
+ catch(Exception e)
+ {
+ _logger.error("Error when receiving messages",e);
+ }
+
+ }
+
+ public void waitforCompletion(int expected) throws Exception
+ {
+ this.expected = expected;
+ testCompleted.await();
+ }
+
+ public void tearDown() throws Exception
+ {
+ session.close();
+ }
+
+ public static void main(String[] args) throws Exception
+ {
+ TestConfiguration config = new JVMArgConfiguration();
+ Reporter reporter = new BasicReporter(config.isReportLatency()? ThroughputAndLatency.class : Throughput.class,
+ System.out,
+ config.reportEvery(),
+ config.isReportHeader()
+ );
+ Destination dest = AMQDestination.createDestination(config.getAddress());
+ QpidReceive receiver = new QpidReceive(reporter,config, config.createConnection(),dest);
+ receiver.setUp();
+ receiver.waitforCompletion(config.getMsgCount());
+ if (config.isReportTotal())
+ {
+ reporter.report();
+ }
+ receiver.tearDown();
+ }
+
+}
diff --git a/java/tools/src/main/java/org/apache/qpid/tools/QpidSend.java b/java/tools/src/main/java/org/apache/qpid/tools/QpidSend.java
new file mode 100644
index 0000000000..c058b83d41
--- /dev/null
+++ b/java/tools/src/main/java/org/apache/qpid/tools/QpidSend.java
@@ -0,0 +1,291 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tools;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.UUID;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.Message;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.tools.TestConfiguration.MessageType;
+import org.apache.qpid.tools.report.BasicReporter;
+import org.apache.qpid.tools.report.Reporter;
+import org.apache.qpid.tools.report.Statistics.Throughput;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class QpidSend
+{
+ private Connection con;
+ private Session session;
+ private Destination dest;
+ private MessageProducer producer;
+ private MessageType msgType;
+ private Message msg;
+ private Object payload;
+ private List<Object> payloads;
+ private boolean cacheMsg = false;
+ private boolean randomMsgSize = false;
+ private boolean durable = false;
+ private Random random;
+ private int msgSizeRange = 1024;
+ private int totalMsgCount = 0;
+ private boolean rateLimitProducer = false;
+ private boolean transacted = false;
+ private int txSize = 0;
+
+ private static final Logger _logger = LoggerFactory.getLogger(QpidSend.class);
+ Reporter report;
+ TestConfiguration config;
+
+ public QpidSend(Reporter report, TestConfiguration config, Connection con, Destination dest)
+ {
+ this(report,config, con, dest, UUID.randomUUID().toString());
+ }
+
+ public QpidSend(Reporter report, TestConfiguration config, Connection con, Destination dest, String prefix)
+ {
+ //System.out.println("Producer ID : " + id);
+ this.report = report;
+ this.config = config;
+ this.con = con;
+ this.dest = dest;
+ }
+
+ public void setUp() throws Exception
+ {
+ if (config.isTransacted())
+ {
+ session = con.createSession(true, Session.SESSION_TRANSACTED);
+ }
+ else
+ {
+ session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ }
+
+ durable = config.isDurable();
+ rateLimitProducer = config.getSendRate() > 0 ? true : false;
+ if (_logger.isDebugEnabled() && rateLimitProducer)
+ {
+ System.out.println("The test will attempt to limit the producer to " + config.getSendRate() + " msg/sec");
+ }
+
+ transacted = config.isTransacted();
+ txSize = config.getTransactionSize();
+
+ msgType = MessageType.getType(config.getMessageType());
+ // if message caching is enabled we pre create the message
+ // else we pre create the payload
+ if (config.isCacheMessage())
+ {
+ cacheMsg = true;
+ msg = createMessage(createPayload(config.getMsgSize()));
+ msg.setJMSDeliveryMode(durable?
+ DeliveryMode.PERSISTENT :
+ DeliveryMode.NON_PERSISTENT
+ );
+ }
+ else if (config.isRandomMsgSize())
+ {
+ random = new Random(20080921);
+ randomMsgSize = true;
+ msgSizeRange = config.getMsgSize();
+ payloads = new ArrayList<Object>(msgSizeRange);
+
+ for (int i=0; i < msgSizeRange; i++)
+ {
+ payloads.add(createPayload(i));
+ }
+ }
+ else
+ {
+ payload = createPayload(config.getMsgSize());
+ }
+
+ producer = session.createProducer(dest);
+ if (_logger.isDebugEnabled())
+ {
+ System.out.println("Producer: " + /*id +*/ " Sending messages to: " + ((AMQDestination)dest).getAddressName());
+ }
+ producer.setDisableMessageID(config.isDisableMessageID());
+ producer.setDisableMessageTimestamp(config.isDisableTimestamp());
+ }
+
+ Object createPayload(int size)
+ {
+ if (msgType == MessageType.TEXT)
+ {
+ return MessageFactory.createMessagePayload(size);
+ }
+ else
+ {
+ return MessageFactory.createMessagePayload(size).getBytes();
+ }
+ }
+
+ Message createMessage(Object payload) throws Exception
+ {
+ if (msgType == MessageType.TEXT)
+ {
+ return session.createTextMessage((String)payload);
+ }
+ else
+ {
+ BytesMessage m = session.createBytesMessage();
+ m.writeBytes((byte[])payload);
+ return m;
+ }
+ }
+
+ protected Message getNextMessage() throws Exception
+ {
+ if (cacheMsg)
+ {
+ return msg;
+ }
+ else
+ {
+ Message m;
+
+ if (!randomMsgSize)
+ {
+ m = createMessage(payload);
+ }
+ else
+ {
+ m = createMessage(payloads.get(random.nextInt(msgSizeRange)));
+ }
+ m.setJMSDeliveryMode(durable?
+ DeliveryMode.PERSISTENT :
+ DeliveryMode.NON_PERSISTENT
+ );
+ return m;
+ }
+ }
+
+ public void commit() throws Exception
+ {
+ session.commit();
+ }
+
+ public void send() throws Exception
+ {
+ send(config.getMsgCount());
+ }
+
+ public void send(int count) throws Exception
+ {
+ int sendRate = config.getSendRate();
+ if (rateLimitProducer)
+ {
+ int iterations = count/sendRate;
+ int remainder = count%sendRate;
+ for (int i=0; i < iterations; i++)
+ {
+ long iterationStart = Clock.getTime();
+ sendMessages(sendRate);
+ long elapsed = (Clock.getTime() - iterationStart)*Clock.convertToMiliSecs();
+ long diff = Clock.SEC - elapsed;
+ if (diff > 0)
+ {
+ // We have sent more messages in a sec than specified by the rate.
+ Thread.sleep(diff);
+ }
+ }
+ sendMessages(remainder);
+ }
+ else
+ {
+ sendMessages(count);
+ }
+ }
+
+ private void sendMessages(int count) throws Exception
+ {
+ boolean isTimestamp = config.isReportLatency();
+ for(int i=0; i < count; i++ )
+ {
+ Message msg = getNextMessage();
+ if (isTimestamp)
+ {
+ msg.setLongProperty(TestConfiguration.TIMESTAMP, Clock.getTime());
+ }
+ producer.send(msg);
+ report.message(msg);
+ totalMsgCount++;
+
+ if ( transacted && ((totalMsgCount) % txSize == 0))
+ {
+ session.commit();
+ }
+ }
+ }
+
+ public void resetCounters()
+ {
+ totalMsgCount = 0;
+ report.clear();
+ }
+
+ public void sendEndMessage() throws Exception
+ {
+ Message msg = session.createMessage();
+ msg.setBooleanProperty(TestConfiguration.EOS, true);
+ producer.send(msg);
+ }
+
+ public void tearDown() throws Exception
+ {
+ session.close();
+ }
+
+ public static void main(String[] args) throws Exception
+ {
+ TestConfiguration config = new JVMArgConfiguration();
+ Reporter reporter = new BasicReporter(Throughput.class,
+ System.out,
+ config.reportEvery(),
+ config.isReportHeader()
+ );
+ Destination dest = AMQDestination.createDestination(config.getAddress());
+ QpidSend sender = new QpidSend(reporter,config, config.createConnection(),dest);
+ sender.setUp();
+ sender.send();
+ if (config.getSendEOS() > 0)
+ {
+ sender.sendEndMessage();
+ }
+ if (config.isReportTotal())
+ {
+ reporter.report();
+ }
+ sender.tearDown();
+ }
+}
diff --git a/java/tools/src/main/java/org/apache/qpid/tools/TestConfiguration.java b/java/tools/src/main/java/org/apache/qpid/tools/TestConfiguration.java
new file mode 100644
index 0000000000..7f7df0e5e6
--- /dev/null
+++ b/java/tools/src/main/java/org/apache/qpid/tools/TestConfiguration.java
@@ -0,0 +1,126 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tools;
+
+import java.text.DecimalFormat;
+
+import javax.jms.Connection;
+
+public interface TestConfiguration
+{
+ enum MessageType {
+ BYTES, TEXT, MAP, OBJECT;
+
+ public static MessageType getType(String s) throws Exception
+ {
+ if ("text".equalsIgnoreCase(s))
+ {
+ return TEXT;
+ }
+ else if ("bytes".equalsIgnoreCase(s))
+ {
+ return BYTES;
+ }
+ /*else if ("map".equalsIgnoreCase(s))
+ {
+ return MAP;
+ }
+ else if ("object".equalsIgnoreCase(s))
+ {
+ return OBJECT;
+ }*/
+ else
+ {
+ throw new Exception("Unsupported message type");
+ }
+ }
+ };
+
+ public final static String TIMESTAMP = "ts";
+
+ public final static String EOS = "eos";
+
+ public final static String SEQUENCE_NUMBER = "sn";
+
+ public String getUrl();
+
+ public String getHost();
+
+ public int getPort();
+
+ public String getAddress();
+
+ public int getAckMode();
+
+ public int getMsgCount();
+
+ public int getMsgSize();
+
+ public int getRandomMsgSizeStartFrom();
+
+ public boolean isDurable();
+
+ public boolean isTransacted();
+
+ public int getTransactionSize();
+
+ public int getWarmupCount();
+
+ public boolean isCacheMessage();
+
+ public boolean isDisableMessageID();
+
+ public boolean isDisableTimestamp();
+
+ public boolean isRandomMsgSize();
+
+ public String getMessageType();
+
+ public boolean isPrintStdDev();
+
+ public int getSendRate();
+
+ public boolean isExternalController();
+
+ public boolean isUseUniqueDests();
+
+ public int getAckFrequency();
+
+ public Connection createConnection() throws Exception;
+
+ public DecimalFormat getDecimalFormat();
+
+ public int reportEvery();
+
+ public boolean isReportTotal();
+
+ public boolean isReportHeader();
+
+ public boolean isReportLatency();
+
+ public int getSendEOS();
+
+ public int getConnectionCount();
+
+ public int getRollbackFrequency();
+
+ public boolean isPrintHeaders();
+} \ No newline at end of file
diff --git a/java/tools/src/main/java/org/apache/qpid/tools/TestParams.java b/java/tools/src/main/java/org/apache/qpid/tools/TestParams.java
deleted file mode 100644
index d73be0181b..0000000000
--- a/java/tools/src/main/java/org/apache/qpid/tools/TestParams.java
+++ /dev/null
@@ -1,214 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.tools;
-
-import javax.jms.Session;
-
-public class TestParams
-{
- /*
- * By default the connection URL is used.
- * This allows a user to easily specify a fully fledged URL any given property.
- * Ex. SSL parameters
- *
- * By providing a host & port allows a user to simply override the URL.
- * This allows to create multiple clients in test scripts easily,
- * without having to deal with the long URL format.
- */
- private String url = "amqp://guest:guest@clientid/testpath?brokerlist='tcp://localhost:5672'";
-
- private String host = "";
-
- private int port = -1;
-
- private String address = "queue; {create : always}";
-
- private int msg_size = 1024;
-
- private int random_msg_size_start_from = 1;
-
- private boolean cacheMessage = false;
-
- private boolean disableMessageID = false;
-
- private boolean disableTimestamp = false;
-
- private boolean durable = false;
-
- private boolean transacted = false;
-
- private int transaction_size = 1000;
-
- private int ack_mode = Session.AUTO_ACKNOWLEDGE;
-
- private int msg_count = 10;
-
- private int warmup_count = 1;
-
- private boolean random_msg_size = false;
-
- private String msgType = "bytes";
-
- private boolean printStdDev = false;
-
- private long rate = -1;
-
- private boolean externalController = false;
-
- private boolean useUniqueDest = false; // useful when using multiple connections.
-
- public TestParams()
- {
-
- url = System.getProperty("url",url);
- host = System.getProperty("host","");
- port = Integer.getInteger("port", -1);
- address = System.getProperty("address",address);
-
- msg_size = Integer.getInteger("msg_size", 1024);
- cacheMessage = Boolean.getBoolean("cache_msg");
- disableMessageID = Boolean.getBoolean("disableMessageID");
- disableTimestamp = Boolean.getBoolean("disableTimestamp");
- durable = Boolean.getBoolean("durable");
- transacted = Boolean.getBoolean("transacted");
- transaction_size = Integer.getInteger("trans_size",1000);
- ack_mode = Integer.getInteger("ack_mode",Session.AUTO_ACKNOWLEDGE);
- msg_count = Integer.getInteger("msg_count",msg_count);
- warmup_count = Integer.getInteger("warmup_count",warmup_count);
- random_msg_size = Boolean.getBoolean("random_msg_size");
- msgType = System.getProperty("msg_type","bytes");
- printStdDev = Boolean.getBoolean("print_std_dev");
- rate = Long.getLong("rate",-1);
- externalController = Boolean.getBoolean("ext_controller");
- useUniqueDest = Boolean.getBoolean("use_unique_dest");
- random_msg_size_start_from = Integer.getInteger("random_msg_size_start_from", 1);
- }
-
- public String getUrl()
- {
- return url;
- }
-
- public String getHost()
- {
- return host;
- }
-
- public int getPort()
- {
- return port;
- }
-
- public String getAddress()
- {
- return address;
- }
-
- public int getAckMode()
- {
- return ack_mode;
- }
-
- public int getMsgCount()
- {
- return msg_count;
- }
-
- public int getMsgSize()
- {
- return msg_size;
- }
-
- public int getRandomMsgSizeStartFrom()
- {
- return random_msg_size_start_from;
- }
-
- public boolean isDurable()
- {
- return durable;
- }
-
- public boolean isTransacted()
- {
- return transacted;
- }
-
- public int getTransactionSize()
- {
- return transaction_size;
- }
-
- public int getWarmupCount()
- {
- return warmup_count;
- }
-
- public boolean isCacheMessage()
- {
- return cacheMessage;
- }
-
- public boolean isDisableMessageID()
- {
- return disableMessageID;
- }
-
- public boolean isDisableTimestamp()
- {
- return disableTimestamp;
- }
-
- public boolean isRandomMsgSize()
- {
- return random_msg_size;
- }
-
- public String getMessageType()
- {
- return msgType;
- }
-
- public boolean isPrintStdDev()
- {
- return printStdDev;
- }
-
- public long getRate()
- {
- return rate;
- }
-
- public boolean isExternalController()
- {
- return externalController;
- }
-
- public void setAddress(String addr)
- {
- address = addr;
- }
-
- public boolean isUseUniqueDests()
- {
- return useUniqueDest;
- }
-}
diff --git a/java/tools/src/main/java/org/apache/qpid/tools/report/BasicReporter.java b/java/tools/src/main/java/org/apache/qpid/tools/report/BasicReporter.java
new file mode 100644
index 0000000000..a9896c1d4e
--- /dev/null
+++ b/java/tools/src/main/java/org/apache/qpid/tools/report/BasicReporter.java
@@ -0,0 +1,113 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tools.report;
+
+import java.io.PrintStream;
+import java.lang.reflect.Constructor;
+
+import javax.jms.Message;
+
+public class BasicReporter implements Reporter
+{
+ PrintStream out;
+ int batchSize = 0;
+ int batchCount = 0;
+ boolean headerPrinted = false;
+ protected Statistics overall;
+ Statistics batch;
+ Constructor<? extends Statistics> statCtor;
+
+ public BasicReporter(Class<? extends Statistics> clazz, PrintStream out,
+ int batchSize, boolean printHeader) throws Exception
+ {
+ this.out = out;
+ this.batchSize = batchSize;
+ this.headerPrinted = !printHeader;
+ statCtor = clazz.getConstructor();
+ overall = statCtor.newInstance();
+ if (batchSize > 0)
+ {
+ batch = statCtor.newInstance();
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.qpid.tools.report.Reporter#message(javax.jms.Message)
+ */
+ @Override
+ public void message(Message msg)
+ {
+ overall.message(msg);
+ if (batchSize > 0)
+ {
+ batch.message(msg);
+ if (++batchCount == batchSize)
+ {
+ if (!headerPrinted)
+ {
+ header();
+ }
+ batch.report(out);
+ batch.clear();
+ batchCount = 0;
+ }
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.qpid.tools.report.Reporter#report()
+ */
+ @Override
+ public void report()
+ {
+ if (!headerPrinted)
+ {
+ header();
+ }
+ overall.report(out);
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.qpid.tools.report.Reporter#header()
+ */
+ @Override
+ public void header()
+ {
+ headerPrinted = true;
+ overall.header(out);
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.qpid.tools.report.Reporter#log()
+ */
+ @Override
+ public void log(String s)
+ {
+ // NOOP
+ }
+
+ @Override
+ public void clear()
+ {
+ batch.clear();
+ overall.clear();
+ }
+}
diff --git a/java/tools/src/main/java/org/apache/qpid/tools/report/MercuryReporter.java b/java/tools/src/main/java/org/apache/qpid/tools/report/MercuryReporter.java
new file mode 100644
index 0000000000..e9bf7100c1
--- /dev/null
+++ b/java/tools/src/main/java/org/apache/qpid/tools/report/MercuryReporter.java
@@ -0,0 +1,167 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tools.report;
+
+import java.io.PrintStream;
+
+import org.apache.qpid.tools.report.Statistics.Throughput;
+import org.apache.qpid.tools.report.Statistics.ThroughputAndLatency;
+
+public class MercuryReporter extends BasicReporter
+{
+ MercuryStatistics stats;
+
+ public MercuryReporter(Class<? extends MercuryStatistics> clazz, PrintStream out,
+ int batchSize, boolean printHeader) throws Exception
+ {
+ super(clazz, out, batchSize, printHeader);
+ stats = (MercuryStatistics)overall;
+ }
+
+ public double getRate()
+ {
+ return stats.getRate();
+ }
+
+ public double getAvgLatency()
+ {
+ return stats.getAvgLatency();
+ }
+
+ public double getStdDev()
+ {
+ return stats.getStdDev();
+ }
+
+ public double getMinLatency()
+ {
+ return stats.getMinLatency();
+ }
+
+ public double getMaxLatency()
+ {
+ return stats.getMaxLatency();
+ }
+
+ public int getSampleSize()
+ {
+ return stats.getSampleSize();
+ }
+
+ public interface MercuryStatistics extends Statistics
+ {
+ public double getRate();
+ public long getMinLatency();
+ public long getMaxLatency();
+ public double getAvgLatency();
+ public double getStdDev();
+ public int getSampleSize();
+ }
+
+ public static class MercuryThroughput extends Throughput implements MercuryStatistics
+ {
+ double rate = 0;
+
+ @Override
+ public void report(PrintStream out)
+ {
+ long elapsed = System.currentTimeMillis() - start;
+ rate = (double)messages/(double)elapsed;
+ }
+
+ @Override
+ public void clear()
+ {
+ super.clear();
+ rate = 0;
+ }
+
+ public double getRate()
+ {
+ return rate;
+ }
+
+ public int getSampleSize()
+ {
+ return messages;
+ }
+
+ public long getMinLatency() { return 0; }
+ public long getMaxLatency() { return 0; }
+ public double getAvgLatency(){ return 0; }
+ public double getStdDev(){ return 0; }
+
+ }
+
+ public static class MercuryThroughputAndLatency extends ThroughputAndLatency implements MercuryStatistics
+ {
+ double rate = 0;
+ double avgLatency = 0;
+ double stdDev;
+
+ @Override
+ public void report(PrintStream out)
+ {
+ long elapsed = System.currentTimeMillis() - start;
+ rate = (double)messages/(double)elapsed;
+ avgLatency = totalLatency/(double)sampleCount;
+ }
+
+ @Override
+ public void clear()
+ {
+ super.clear();
+ rate = 0;
+ avgLatency = 0;
+ }
+
+ public double getRate()
+ {
+ return rate;
+ }
+
+ public long getMinLatency()
+ {
+ return minLatency;
+ }
+
+ public long getMaxLatency()
+ {
+ return maxLatency;
+ }
+
+ public double getAvgLatency()
+ {
+ return avgLatency;
+ }
+
+ public double getStdDev()
+ {
+ return stdDev;
+ }
+
+ public int getSampleSize()
+ {
+ return messages;
+ }
+ }
+
+}
diff --git a/java/tools/src/main/java/org/apache/qpid/tools/report/Reporter.java b/java/tools/src/main/java/org/apache/qpid/tools/report/Reporter.java
new file mode 100644
index 0000000000..5e481458be
--- /dev/null
+++ b/java/tools/src/main/java/org/apache/qpid/tools/report/Reporter.java
@@ -0,0 +1,40 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tools.report;
+
+import javax.jms.Message;
+
+public interface Reporter
+{
+
+ public void message(Message msg);
+
+ public void report();
+
+ public void header();
+
+ // Will be used by some reporters to print statements which are greped by
+ // scripts. Example see java/tools/bin/perf-report
+ public void log(String s);
+
+ public void clear();
+
+} \ No newline at end of file
diff --git a/java/tools/src/main/java/org/apache/qpid/tools/report/Statistics.java b/java/tools/src/main/java/org/apache/qpid/tools/report/Statistics.java
new file mode 100644
index 0000000000..73efd1f1e0
--- /dev/null
+++ b/java/tools/src/main/java/org/apache/qpid/tools/report/Statistics.java
@@ -0,0 +1,139 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tools.report;
+
+import java.io.PrintStream;
+import java.text.DecimalFormat;
+
+import javax.jms.Message;
+
+public interface Statistics
+{
+ public void message(Message msg);
+ public void report(PrintStream out);
+ public void header(PrintStream out);
+ public void clear();
+
+ static class Throughput implements Statistics
+ {
+ DecimalFormat df = new DecimalFormat("###.##");
+ int messages = 0;
+ long start = 0;
+ boolean started = false;
+
+ @Override
+ public void message(Message msg)
+ {
+ ++messages;
+ if (!started)
+ {
+ start = System.currentTimeMillis();
+ started = true;
+ }
+ }
+
+ @Override
+ public void report(PrintStream out)
+ {
+ long elapsed = System.currentTimeMillis() - start;
+ out.print(df.format((double)messages/(double)elapsed));
+ }
+
+ @Override
+ public void header(PrintStream out)
+ {
+ out.print("tp(m/s)");
+ }
+
+ public void clear()
+ {
+ messages = 0;
+ start = 0;
+ started = false;
+ }
+ }
+
+ static class ThroughputAndLatency extends Throughput
+ {
+ long minLatency = Long.MAX_VALUE;
+ long maxLatency = Long.MIN_VALUE;
+ double totalLatency = 0;
+ int sampleCount = 0;
+
+ @Override
+ public void message(Message msg)
+ {
+ super.message(msg);
+ try
+ {
+ long ts = msg.getLongProperty("ts");
+ long latency = System.currentTimeMillis() - ts;
+ minLatency = Math.min(latency, minLatency);
+ maxLatency = Math.min(latency, maxLatency);
+ totalLatency = totalLatency + latency;
+ sampleCount++;
+ }
+ catch(Exception e)
+ {
+ System.out.println("Error calculating latency");
+ }
+ }
+
+ @Override
+ public void report(PrintStream out)
+ {
+ super.report(out);
+ double avgLatency = totalLatency/(double)sampleCount;
+ out.append('\t')
+ .append(String.valueOf(minLatency))
+ .append('\t')
+ .append(String.valueOf(maxLatency))
+ .append('\t')
+ .append(df.format(avgLatency));
+
+ out.flush();
+ }
+
+ @Override
+ public void header(PrintStream out)
+ {
+ super.header(out);
+ out.append('\t')
+ .append("l-min")
+ .append('\t')
+ .append("l-max")
+ .append('\t')
+ .append("l-avg");
+
+ out.flush();
+ }
+
+ public void clear()
+ {
+ super.clear();
+ minLatency = 0;
+ maxLatency = 0;
+ totalLatency = 0;
+ sampleCount = 0;
+ }
+ }
+
+}