diff options
| author | Kim van der Riet <kpvdr@apache.org> | 2012-05-04 15:39:19 +0000 |
|---|---|---|
| committer | Kim van der Riet <kpvdr@apache.org> | 2012-05-04 15:39:19 +0000 |
| commit | 633c33f224f3196f3f9bd80bd2e418d8143fea06 (patch) | |
| tree | 1391da89470593209466df68c0b40b89c14963b1 /java/tools | |
| parent | c73f9286ebff93a6c8dbc29cf05e258c4b55c976 (diff) | |
| download | qpid-python-633c33f224f3196f3f9bd80bd2e418d8143fea06.tar.gz | |
QPID-3858: Updated branch - merged from trunk r.1333987
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1334037 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/tools')
21 files changed, 2012 insertions, 1374 deletions
diff --git a/java/tools/bin/perf-report b/java/tools/bin/jms-quick-perf-report index 7de3f2b602..7de3f2b602 100755 --- a/java/tools/bin/perf-report +++ b/java/tools/bin/jms-quick-perf-report diff --git a/java/tools/bin/controller b/java/tools/bin/mercury-controller index fab8614039..fab8614039 100644 --- a/java/tools/bin/controller +++ b/java/tools/bin/mercury-controller diff --git a/java/tools/bin/start-consumers b/java/tools/bin/mercury-start-consumers index c71fc0c21f..c71fc0c21f 100644 --- a/java/tools/bin/start-consumers +++ b/java/tools/bin/mercury-start-consumers diff --git a/java/tools/bin/start-producers b/java/tools/bin/mercury-start-producers index 7ba0286f7c..7ba0286f7c 100644 --- a/java/tools/bin/start-producers +++ b/java/tools/bin/mercury-start-producers diff --git a/java/tools/src/main/java/org/apache/qpid/tools/Clock.java b/java/tools/src/main/java/org/apache/qpid/tools/Clock.java index 979d2ef76f..4e79dd62a8 100644 --- a/java/tools/src/main/java/org/apache/qpid/tools/Clock.java +++ b/java/tools/src/main/java/org/apache/qpid/tools/Clock.java @@ -27,6 +27,8 @@ package org.apache.qpid.tools; public class Clock { + public final static long SEC = 60000; + private static Precision precision; private static long offset = -1; // in nano secs diff --git a/java/tools/src/main/java/org/apache/qpid/tools/JVMArgConfiguration.java b/java/tools/src/main/java/org/apache/qpid/tools/JVMArgConfiguration.java new file mode 100644 index 0000000000..c6abdf6c84 --- /dev/null +++ b/java/tools/src/main/java/org/apache/qpid/tools/JVMArgConfiguration.java @@ -0,0 +1,411 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.tools; + +import java.text.DecimalFormat; + +import javax.jms.Connection; +import javax.jms.Session; + +import org.apache.qpid.client.AMQConnection; + +public class JVMArgConfiguration implements TestConfiguration +{ + /* + * By default the connection URL is used. + * This allows a user to easily specify a fully fledged URL any given property. + * Ex. SSL parameters + * + * By providing a host & port allows a user to simply override the URL. + * This allows to create multiple clients in test scripts easily, + * without having to deal with the long URL format. + */ + private String url = "amqp://guest:guest@clientid/testpath?brokerlist='tcp://localhost:5672'"; + + private String host = ""; + + private int port = -1; + + private String address = "queue; {create : always}"; + + private int msg_size = 1024; + + private int random_msg_size_start_from = 1; + + private boolean cacheMessage = false; + + private boolean disableMessageID = false; + + private boolean disableTimestamp = false; + + private boolean durable = false; + + private int transaction_size = 0; + + private int ack_mode = Session.AUTO_ACKNOWLEDGE; + + private int msg_count = 10; + + private int warmup_count = 1; + + private boolean random_msg_size = false; + + private String msgType = "bytes"; + + private boolean printStdDev = false; + + private int sendRate = 0; + + private boolean externalController = false; + + private boolean useUniqueDest = false; // useful when using multiple connections. + + private int ackFrequency = 100; + + private DecimalFormat df = new DecimalFormat("###.##"); + + private int reportEvery = 0; + + private boolean isReportTotal = false; + + private boolean isReportHeader = true; + + private boolean isReportLatency = false; + + private int sendEOS = 0; + + private int connectionCount = 1; + + private int rollbackFrequency = 0; + + private boolean printHeaders; + + public JVMArgConfiguration() + { + + url = System.getProperty("url",url); + host = System.getProperty("host",""); + port = Integer.getInteger("port", -1); + address = System.getProperty("address",address); + + msg_size = Integer.getInteger("msg-size", 1024); + cacheMessage = Boolean.getBoolean("cache-msg"); + disableMessageID = Boolean.getBoolean("disable-message-id"); + disableTimestamp = Boolean.getBoolean("disable-timestamp"); + durable = Boolean.getBoolean("durable"); + transaction_size = Integer.getInteger("tx",1000); + ack_mode = Integer.getInteger("ack-mode",Session.AUTO_ACKNOWLEDGE); + msg_count = Integer.getInteger("msg-count",msg_count); + warmup_count = Integer.getInteger("warmup-count",warmup_count); + random_msg_size = Boolean.getBoolean("random-msg-size"); + msgType = System.getProperty("msg-type","bytes"); + printStdDev = Boolean.getBoolean("print-std-dev"); + sendRate = Integer.getInteger("rate",0); + externalController = Boolean.getBoolean("ext-controller"); + useUniqueDest = Boolean.getBoolean("use-unique-dest"); + random_msg_size_start_from = Integer.getInteger("random-msg-size-start-from", 1); + reportEvery = Integer.getInteger("report-every"); + isReportTotal = Boolean.getBoolean("report-total"); + isReportHeader = (System.getProperty("report-header") == null) ? true : Boolean.getBoolean("report-header"); + isReportLatency = Boolean.getBoolean("report-latency"); + sendEOS = Integer.getInteger("send-eos"); + connectionCount = Integer.getInteger("con_count",1); + ackFrequency = Integer.getInteger("ack-frequency"); + rollbackFrequency = Integer.getInteger("rollback-frequency"); + printHeaders = Boolean.getBoolean("print-headers"); + } + + /* (non-Javadoc) + * @see org.apache.qpid.tools.TestConfiguration#getUrl() + */ + @Override + public String getUrl() + { + return url; + } + + /* (non-Javadoc) + * @see org.apache.qpid.tools.TestConfiguration#getHost() + */ + @Override + public String getHost() + { + return host; + } + + /* (non-Javadoc) + * @see org.apache.qpid.tools.TestConfiguration#getPort() + */ + @Override + public int getPort() + { + return port; + } + + /* (non-Javadoc) + * @see org.apache.qpid.tools.TestConfiguration#getAddress() + */ + @Override + public String getAddress() + { + return address; + } + + /* (non-Javadoc) + * @see org.apache.qpid.tools.TestConfiguration#getAckMode() + */ + @Override + public int getAckMode() + { + return ack_mode; + } + + /* (non-Javadoc) + * @see org.apache.qpid.tools.TestConfiguration#getMsgCount() + */ + @Override + public int getMsgCount() + { + return msg_count; + } + + /* (non-Javadoc) + * @see org.apache.qpid.tools.TestConfiguration#getMsgSize() + */ + @Override + public int getMsgSize() + { + return msg_size; + } + + /* (non-Javadoc) + * @see org.apache.qpid.tools.TestConfiguration#getRandomMsgSizeStartFrom() + */ + @Override + public int getRandomMsgSizeStartFrom() + { + return random_msg_size_start_from; + } + + /* (non-Javadoc) + * @see org.apache.qpid.tools.TestConfiguration#isDurable() + */ + @Override + public boolean isDurable() + { + return durable; + } + + /* (non-Javadoc) + * @see org.apache.qpid.tools.TestConfiguration#isTransacted() + */ + @Override + public boolean isTransacted() + { + return transaction_size > 0; + } + + /* (non-Javadoc) + * @see org.apache.qpid.tools.TestConfiguration#getTransactionSize() + */ + @Override + public int getTransactionSize() + { + return transaction_size; + } + + /* (non-Javadoc) + * @see org.apache.qpid.tools.TestConfiguration#getWarmupCount() + */ + @Override + public int getWarmupCount() + { + return warmup_count; + } + + /* (non-Javadoc) + * @see org.apache.qpid.tools.TestConfiguration#isCacheMessage() + */ + @Override + public boolean isCacheMessage() + { + return cacheMessage; + } + + /* (non-Javadoc) + * @see org.apache.qpid.tools.TestConfiguration#isDisableMessageID() + */ + @Override + public boolean isDisableMessageID() + { + return disableMessageID; + } + + /* (non-Javadoc) + * @see org.apache.qpid.tools.TestConfiguration#isDisableTimestamp() + */ + @Override + public boolean isDisableTimestamp() + { + return disableTimestamp; + } + + /* (non-Javadoc) + * @see org.apache.qpid.tools.TestConfiguration#isRandomMsgSize() + */ + @Override + public boolean isRandomMsgSize() + { + return random_msg_size; + } + + /* (non-Javadoc) + * @see org.apache.qpid.tools.TestConfiguration#getMessageType() + */ + @Override + public String getMessageType() + { + return msgType; + } + + /* (non-Javadoc) + * @see org.apache.qpid.tools.TestConfiguration#isPrintStdDev() + */ + @Override + public boolean isPrintStdDev() + { + return printStdDev; + } + + /* (non-Javadoc) + * @see org.apache.qpid.tools.TestConfiguration#getSendRate() + */ + @Override + public int getSendRate() + { + return sendRate; + } + + /* (non-Javadoc) + * @see org.apache.qpid.tools.TestConfiguration#isExternalController() + */ + @Override + public boolean isExternalController() + { + return externalController; + } + + public void setAddress(String addr) + { + address = addr; + } + + /* (non-Javadoc) + * @see org.apache.qpid.tools.TestConfiguration#isUseUniqueDests() + */ + @Override + public boolean isUseUniqueDests() + { + return useUniqueDest; + } + + /* (non-Javadoc) + * @see org.apache.qpid.tools.TestConfiguration#getAckFrequency() + */ + @Override + public int getAckFrequency() + { + return ackFrequency; + } + + /* (non-Javadoc) + * @see org.apache.qpid.tools.TestConfiguration#createConnection() + */ + @Override + public Connection createConnection() throws Exception + { + if (getHost().equals("") || getPort() == -1) + { + return new AMQConnection(getUrl()); + } + else + { + return new AMQConnection(getHost(),getPort(),"guest","guest","test","test"); + } + } + + /* (non-Javadoc) + * @see org.apache.qpid.tools.TestConfiguration#getDecimalFormat() + */ + @Override + public DecimalFormat getDecimalFormat() + { + return df; + } + + @Override + public int reportEvery() + { + return reportEvery; + } + + @Override + public boolean isReportTotal() + { + return isReportTotal; + } + + @Override + public boolean isReportHeader() + { + return isReportHeader; + } + + @Override + public boolean isReportLatency() + { + return isReportLatency; + } + + @Override + public int getSendEOS() + { + return sendEOS; + } + + @Override + public int getConnectionCount() + { + return connectionCount; + } + + @Override + public int getRollbackFrequency() + { + return rollbackFrequency; + } + + @Override + public boolean isPrintHeaders() + { + return printHeaders; + } +} diff --git a/java/tools/src/main/java/org/apache/qpid/tools/LatencyTest.java b/java/tools/src/main/java/org/apache/qpid/tools/LatencyTest.java deleted file mode 100644 index 16149d17c9..0000000000 --- a/java/tools/src/main/java/org/apache/qpid/tools/LatencyTest.java +++ /dev/null @@ -1,349 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.tools; - -import java.io.FileOutputStream; -import java.io.PrintWriter; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; - -import javax.jms.BytesMessage; -import javax.jms.DeliveryMode; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import javax.jms.MessageProducer; -import javax.jms.TextMessage; - -import org.apache.qpid.thread.Threading; - -/** - * Latency test sends an x number of messages in warmup mode and wait for a confirmation - * from the consumer that it has successfully consumed them and ready to start the - * test. It will start sending y number of messages and each message will contain a time - * stamp. This will be used at the receiving end to measure the latency. - * - * It is important to have a sufficiently large number for the warmup count to - * ensure the system is in steady state before the test is started. - * - * If you plan to plot the latencies then msg_count should be a smaller number (ex 500 or 1000) - * You also need to specify a file name using -Dfile=/home/rajith/latency.log.1 - * - * The idea is to get a latency sample for the system once it achieves steady state. - * - */ - -public class LatencyTest extends PerfBase implements MessageListener -{ - private MessageProducer producer; - private MessageConsumer consumer; - private Message msg; - private byte[] payload; - private long maxLatency = 0; - private long minLatency = Long.MAX_VALUE; - private long totalLatency = 0; // to calculate avg latency. - private int rcvdMsgCount = 0; - private double stdDev = 0; - private double avgLatency = 0; - private boolean warmup_mode = true; - private boolean transacted = false; - private int transSize = 0; - - private final List<Long> latencies; - private final Lock lock = new ReentrantLock(); - private final Condition warmedUp; - private final Condition testCompleted; - - public LatencyTest() - { - super(""); - warmedUp = lock.newCondition(); - testCompleted = lock.newCondition(); - // Storing the following two for efficiency - transacted = params.isTransacted(); - transSize = params.getTransactionSize(); - latencies = new ArrayList <Long>(params.getMsgCount()); - } - - public void setUp() throws Exception - { - super.setUp(); - consumer = session.createConsumer(dest); - consumer.setMessageListener(this); - - // if message caching is enabled we pre create the message - // else we pre create the payload - if (params.isCacheMessage()) - { - msg = MessageFactory.createBytesMessage(session, params.getMsgSize()); - msg.setJMSDeliveryMode(params.isDurable()? - DeliveryMode.PERSISTENT : - DeliveryMode.NON_PERSISTENT - ); - } - else - { - payload = MessageFactory.createMessagePayload(params.getMsgSize()).getBytes(); - } - - producer = session.createProducer(dest); - producer.setDisableMessageID(params.isDisableMessageID()); - producer.setDisableMessageTimestamp(params.isDisableTimestamp()); - } - - protected Message getNextMessage() throws Exception - { - if (params.isCacheMessage()) - { - return msg; - } - else - { - msg = session.createBytesMessage(); - ((BytesMessage)msg).writeBytes(payload); - return msg; - } - } - - public void warmup()throws Exception - { - System.out.println("Warming up......"); - int count = params.getWarmupCount(); - for (int i=0; i < count; i++) - { - producer.send(getNextMessage()); - } - Message msg = session.createTextMessage("End"); - producer.send(msg); - - if (params.isTransacted()) - { - session.commit(); - } - - try - { - lock.lock(); - warmedUp.await(); - } - finally - { - lock.unlock(); - } - } - - public void onMessage(Message msg) - { - try - { - if (msg instanceof TextMessage && ((TextMessage)msg).getText().equals("End")) - { - if (warmup_mode) - { - warmup_mode = false; - try - { - lock.lock(); - warmedUp.signal(); - } - finally - { - lock.unlock(); - } - } - else - { - computeStats(); - } - } - else if (!warmup_mode) - { - long time = System.currentTimeMillis(); - rcvdMsgCount ++; - - if (transacted && (rcvdMsgCount % transSize == 0)) - { - session.commit(); - } - - long latency = time - msg.getJMSTimestamp(); - latencies.add(latency); - totalLatency = totalLatency + latency; - } - - } - catch(Exception e) - { - handleError(e,"Error when receiving messages"); - } - - } - - private void computeStats() - { - avgLatency = (double)totalLatency/(double)rcvdMsgCount; - double sigma = 0; - - for (long latency: latencies) - { - maxLatency = Math.max(maxLatency, latency); - minLatency = Math.min(minLatency, latency); - sigma = sigma + Math.pow(latency - avgLatency,2); - } - - stdDev = Math.sqrt(sigma/(rcvdMsgCount -1)); - - try - { - lock.lock(); - testCompleted.signal(); - } - finally - { - lock.unlock(); - } - } - - public void writeToFile() throws Exception - { - String fileName = System.getProperty("file"); - PrintWriter writer = new PrintWriter(new FileOutputStream(fileName)); - for (long latency: latencies) - { - writer.println(String.valueOf(latency)); - } - writer.flush(); - writer.close(); - } - - public void printToConsole() - { - System.out.println(new StringBuilder("Total Msgs Received : ").append(rcvdMsgCount).toString()); - System.out.println(new StringBuilder("Standard Deviation : "). - append(df.format(stdDev)). - append(" ms").toString()); - System.out.println(new StringBuilder("Avg Latency : "). - append(df.format(avgLatency)). - append(" ms").toString()); - System.out.println(new StringBuilder("Min Latency : "). - append(minLatency). - append(" ms").toString()); - System.out.println(new StringBuilder("Max Latency : "). - append(maxLatency). - append(" ms").toString()); - System.out.println("Completed the test......\n"); - } - - public void startTest() throws Exception - { - System.out.println("Starting test......"); - int count = params.getMsgCount(); - - for(int i=0; i < count; i++ ) - { - Message msg = getNextMessage(); - msg.setJMSTimestamp(System.currentTimeMillis()); - producer.send(msg); - if ( transacted && ((i+1) % transSize == 0)) - { - session.commit(); - } - } - Message msg = session.createTextMessage("End"); - producer.send(msg); - if (params.isTransacted()) - { - session.commit(); - } - } - - public void tearDown() throws Exception - { - try - { - lock.lock(); - testCompleted.await(); - } - finally - { - lock.unlock(); - } - - producer.close(); - consumer.close(); - session.close(); - con.close(); - } - - public void test() - { - try - { - setUp(); - warmup(); - startTest(); - tearDown(); - } - catch(Exception e) - { - handleError(e,"Error when running test"); - } - } - - - public static void main(String[] args) - { - final LatencyTest latencyTest = new LatencyTest(); - Runnable r = new Runnable() - { - public void run() - { - latencyTest.test(); - latencyTest.printToConsole(); - if (System.getProperty("file") != null) - { - try - { - latencyTest.writeToFile(); - } - catch(Exception e) - { - e.printStackTrace(); - } - } - } - }; - - Thread t; - try - { - t = Threading.getThreadFactory().createThread(r); - } - catch(Exception e) - { - throw new Error("Error creating latency test thread",e); - } - t.start(); - } -} diff --git a/java/tools/src/main/java/org/apache/qpid/tools/PerfBase.java b/java/tools/src/main/java/org/apache/qpid/tools/MercuryBase.java index 121e94cea1..097b021b3e 100644 --- a/java/tools/src/main/java/org/apache/qpid/tools/PerfBase.java +++ b/java/tools/src/main/java/org/apache/qpid/tools/MercuryBase.java @@ -21,7 +21,6 @@ package org.apache.qpid.tools; import java.net.InetAddress; -import java.text.DecimalFormat; import java.util.UUID; import javax.jms.Connection; @@ -32,14 +31,17 @@ import javax.jms.MessageProducer; import javax.jms.Session; import org.apache.qpid.client.AMQAnyDestination; -import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQDestination; import org.apache.qpid.client.AMQSession_0_10; -import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.messaging.Address; +import org.apache.qpid.tools.TestConfiguration.MessageType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -public class PerfBase +public class MercuryBase { + private static final Logger _logger = LoggerFactory.getLogger(MercuryBase.class); + public final static String CODE = "CODE"; public final static String ID = "ID"; public final static String REPLY_ADDR = "REPLY_ADDR"; @@ -54,14 +56,13 @@ public class PerfBase String CONTROLLER_ADDR = System.getProperty("CONT_ADDR","CONTROLLER;{create: always, node:{x-declare:{auto-delete:true}}}"); - TestParams params; + TestConfiguration config; Connection con; Session session; Session controllerSession; Destination dest; Destination myControlQueue; Destination controllerQueue; - DecimalFormat df = new DecimalFormat("###.##"); String id; String myControlQueueAddr; @@ -69,7 +70,8 @@ public class PerfBase MessageConsumer receiveFromController; String prefix = ""; - enum OPCode { + enum OPCode + { REGISTER_CONSUMER, REGISTER_PRODUCER, PRODUCER_STARTWARMUP, CONSUMER_STARTWARMUP, CONSUMER_READY, PRODUCER_READY, @@ -79,39 +81,11 @@ public class PerfBase CONTINUE_TEST, STOP_TEST }; - enum MessageType { - BYTES, TEXT, MAP, OBJECT; - - public static MessageType getType(String s) throws Exception - { - if ("text".equalsIgnoreCase(s)) - { - return TEXT; - } - else if ("bytes".equalsIgnoreCase(s)) - { - return BYTES; - } - /*else if ("map".equalsIgnoreCase(s)) - { - return MAP; - } - else if ("object".equalsIgnoreCase(s)) - { - return OBJECT; - }*/ - else - { - throw new Exception("Unsupported message type"); - } - } - }; - MessageType msgType = MessageType.BYTES; - public PerfBase(String prefix) + public MercuryBase(TestConfiguration config,String prefix) { - params = new TestParams(); + this.config = config; String host = ""; try { @@ -127,25 +101,16 @@ public class PerfBase public void setUp() throws Exception { - if (params.getHost().equals("") || params.getPort() == -1) - { - con = new AMQConnection(params.getUrl()); - } - else - { - con = new AMQConnection(params.getHost(),params.getPort(),"guest","guest","test","test"); - } + con = config.createConnection(); con.start(); - session = con.createSession(params.isTransacted(), - params.isTransacted()? Session.SESSION_TRANSACTED:params.getAckMode()); controllerSession = con.createSession(false, Session.AUTO_ACKNOWLEDGE); dest = createDestination(); - controllerQueue = new AMQAnyDestination(CONTROLLER_ADDR); + controllerQueue = AMQDestination.createDestination(CONTROLLER_ADDR); myControlQueue = session.createQueue(myControlQueueAddr); - msgType = MessageType.getType(params.getMessageType()); - System.out.println("Using " + msgType + " messages"); + msgType = MessageType.getType(config.getMessageType()); + _logger.debug("Using " + msgType + " messages"); sendToController = controllerSession.createProducer(controllerQueue); receiveFromController = controllerSession.createConsumer(myControlQueue); @@ -153,11 +118,11 @@ public class PerfBase private Destination createDestination() throws Exception { - if (params.isUseUniqueDests()) + if (config.isUseUniqueDests()) { - System.out.println("Prefix : " + prefix); - Address addr = Address.parse(params.getAddress()); - AMQAnyDestination temp = new AMQAnyDestination(params.getAddress()); + _logger.debug("Prefix : " + prefix); + Address addr = Address.parse(config.getAddress()); + AMQDestination temp = (AMQDestination) AMQDestination.createDestination(config.getAddress()); int type = ((AMQSession_0_10)session).resolveAddressType(temp); if ( type == AMQDestination.TOPIC_TYPE) @@ -171,11 +136,11 @@ public class PerfBase System.out.println("Setting name : " + addr); } - return new AMQAnyDestination(addr); + return AMQDestination.createDestination(addr.toString()); } else { - return new AMQAnyDestination(params.getAddress()); + return AMQDestination.createDestination(config.getAddress()); } } @@ -190,7 +155,7 @@ public class PerfBase { MapMessage m = (MapMessage)receiveFromController.receive(); OPCode code = OPCode.values()[m.getInt(CODE)]; - System.out.println("Received Code : " + code); + _logger.debug("Received Code : " + code); if (expected != code) { throw new Exception("Expected OPCode : " + expected + " but received : " + code); @@ -202,7 +167,7 @@ public class PerfBase { MapMessage m = (MapMessage)receiveFromController.receive(); OPCode code = OPCode.values()[m.getInt(CODE)]; - System.out.println("Received Code : " + code); + _logger.debug("Received Code : " + code); return (code == OPCode.CONTINUE_TEST); } diff --git a/java/tools/src/main/java/org/apache/qpid/tools/MercuryConsumerController.java b/java/tools/src/main/java/org/apache/qpid/tools/MercuryConsumerController.java new file mode 100644 index 0000000000..b35adc45d6 --- /dev/null +++ b/java/tools/src/main/java/org/apache/qpid/tools/MercuryConsumerController.java @@ -0,0 +1,231 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.tools; + +import java.util.concurrent.CountDownLatch; + +import javax.jms.MapMessage; + +import org.apache.qpid.thread.Threading; +import org.apache.qpid.tools.report.MercuryReporter; +import org.apache.qpid.tools.report.MercuryReporter.MercuryThroughputAndLatency; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * PerfConsumer will receive x no of messages in warmup mode. + * Once it receives the Start message it will then signal the PerfProducer. + * It will start recording stats from the first message it receives after + * the warmup mode is done. + * + * The following calculations are done. + * The important numbers to look at is + * a) Avg Latency + * b) System throughput. + * + * Latency. + * ========= + * Currently this test is written with the assumption that either + * a) The Perf Producer and Consumer are on the same machine + * b) They are on separate machines that have their time synced via a Time Server + * + * In order to calculate latency the producer inserts a timestamp + * when the message is sent. The consumer will note the current time the message is + * received and will calculate the latency as follows + * latency = rcvdTime - msg.getJMSTimestamp() + * + * Through out the test it will keep track of the max and min latency to show the + * variance in latencies. + * + * Avg latency is measured by adding all latencies and dividing by the total msgs. + * + * Throughput + * =========== + * Consumer rate is calculated as + * rcvdMsgCount/(rcvdTime - startTime) + * + * Note that the testStartTime referes to when the producer sent the first message + * and startTime is when the consumer first received a message. + * + * rcvdTime keeps track of when the last message is received. + * + * All throughput rates are given as msg/sec so the rates are multiplied by 1000. + * + */ + +public class MercuryConsumerController extends MercuryBase +{ + private static final Logger _logger = LoggerFactory.getLogger(MercuryConsumerController.class); + MercuryReporter reporter; + TestConfiguration config; + QpidReceive receiver; + + public MercuryConsumerController(TestConfiguration config, MercuryReporter reporter, String prefix) + { + super(config,prefix); + this.reporter = reporter; + if (_logger.isInfoEnabled()) + { + _logger.info("Consumer ID : " + id); + } + } + + public void setUp() throws Exception + { + super.setUp(); + receiver = new QpidReceive(reporter,config, con,dest); + receiver.setUp(); + MapMessage m = controllerSession.createMapMessage(); + m.setInt(CODE, OPCode.REGISTER_CONSUMER.ordinal()); + sendMessageToController(m); + } + + public void warmup()throws Exception + { + receiveFromController(OPCode.CONSUMER_STARTWARMUP); + receiver.waitforCompletion(config.getWarmupCount()); + + // It's more realistic for the consumer to signal this. + MapMessage m1 = controllerSession.createMapMessage(); + m1.setInt(CODE, OPCode.PRODUCER_READY.ordinal()); + sendMessageToController(m1); + + MapMessage m2 = controllerSession.createMapMessage(); + m2.setInt(CODE, OPCode.CONSUMER_READY.ordinal()); + sendMessageToController(m2); + } + + public void runReceiver() throws Exception + { + if (_logger.isInfoEnabled()) + { + _logger.info("Consumer: " + id + " Starting iteration......" + "\n"); + } + resetCounters(); + receiver.waitforCompletion(config.getMsgCount()); + MapMessage m = controllerSession.createMapMessage(); + m.setInt(CODE, OPCode.RECEIVED_END_MSG.ordinal()); + sendMessageToController(m); + } + + public void resetCounters() + { + reporter.clear(); + } + + public void sendResults() throws Exception + { + receiveFromController(OPCode.CONSUMER_STOP); + reporter.report(); + + MapMessage m = controllerSession.createMapMessage(); + m.setInt(CODE, OPCode.RECEIVED_CONSUMER_STATS.ordinal()); + m.setDouble(AVG_LATENCY, reporter.getAvgLatency()); + m.setDouble(MIN_LATENCY, reporter.getMinLatency()); + m.setDouble(MAX_LATENCY, reporter.getMaxLatency()); + m.setDouble(STD_DEV, reporter.getStdDev()); + m.setDouble(CONS_RATE, reporter.getRate()); + m.setLong(MSG_COUNT, reporter.getSampleSize()); + sendMessageToController(m); + + reporter.log(new StringBuilder("Total Msgs Received : ").append(reporter.getSampleSize()).toString()); + reporter.log(new StringBuilder("Consumer rate : "). + append(config.getDecimalFormat().format(reporter.getRate())). + append(" msg/sec").toString()); + reporter.log(new StringBuilder("Avg Latency : "). + append(config.getDecimalFormat().format(reporter.getAvgLatency())). + append(" ms").toString()); + reporter.log(new StringBuilder("Min Latency : "). + append(config.getDecimalFormat().format(reporter.getMinLatency())). + append(" ms").toString()); + reporter.log(new StringBuilder("Max Latency : "). + append(config.getDecimalFormat().format(reporter.getMaxLatency())). + append(" ms").toString()); + if (config.isPrintStdDev()) + { + reporter.log(new StringBuilder("Std Dev : "). + append(reporter.getStdDev()).toString()); + } + } + + public void run() + { + try + { + setUp(); + warmup(); + boolean nextIteration = true; + while (nextIteration) + { + System.out.println("=========================================================\n"); + System.out.println("Consumer: " + id + " starting a new iteration ......\n"); + runReceiver(); + sendResults(); + nextIteration = continueTest(); + } + tearDown(); + } + catch(Exception e) + { + handleError(e,"Error when running test"); + } + } + + @Override + public void tearDown() throws Exception + { + super.tearDown(); + } + + public static void main(String[] args) throws Exception + { + TestConfiguration config = new JVMArgConfiguration(); + MercuryReporter reporter= new MercuryReporter(MercuryThroughputAndLatency.class,System.out,10,true); + String scriptId = (args.length == 1) ? args[0] : ""; + int conCount = config.getConnectionCount(); + final CountDownLatch testCompleted = new CountDownLatch(conCount); + for (int i=0; i < conCount; i++) + { + final MercuryConsumerController cons = new MercuryConsumerController(config, reporter, scriptId + i); + Runnable r = new Runnable() + { + public void run() + { + cons.run(); + testCompleted.countDown(); + } + }; + + Thread t; + try + { + t = Threading.getThreadFactory().createThread(r); + } + catch(Exception e) + { + throw new Error("Error creating consumer thread",e); + } + t.start(); + } + testCompleted.await(); + reporter.log("Consumers have completed the test......\n"); + } +}
\ No newline at end of file diff --git a/java/tools/src/main/java/org/apache/qpid/tools/MercuryProducerController.java b/java/tools/src/main/java/org/apache/qpid/tools/MercuryProducerController.java new file mode 100644 index 0000000000..02377bb853 --- /dev/null +++ b/java/tools/src/main/java/org/apache/qpid/tools/MercuryProducerController.java @@ -0,0 +1,210 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.tools; + +import java.util.concurrent.CountDownLatch; + +import javax.jms.MapMessage; + +import org.apache.qpid.thread.Threading; +import org.apache.qpid.tools.report.MercuryReporter; +import org.apache.qpid.tools.report.MercuryReporter.MercuryThroughput; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * PerfProducer sends an x no of messages in warmup mode and wait for a confirmation + * from the consumer that it has successfully consumed them and ready to start the + * test. It will start sending y no of messages and each message will contain a time + * stamp. This will be used at the receiving end to measure the latency. + * + * This is done with the assumption that both consumer and producer are running on + * the same machine or different machines which have time synced using a time server. + * + * This test also calculates the producer rate as follows. + * rate = msg_count/(time_before_sending_msgs - time_after_sending_msgs) + * + * All throughput rates are given as msg/sec so the rates are multiplied by 1000. + * + * Rajith - Producer rate is not an accurate perf metric IMO. + * It is heavily inlfuenced by any in memory buffering. + * System throughput and latencies calculated by the PerfConsumer are more realistic + * numbers. + * + * Answer by rajith : I agree about in memory buffering affecting rates. But Based on test runs + * I have done so far, it seems quite useful to compute the producer rate as it gives an + * indication of how the system behaves. For ex if there is a gap between producer and consumer rates + * you could clearly see the higher latencies and when producer and consumer rates are very close, + * latency is good. + * + */ +public class MercuryProducerController extends MercuryBase +{ + private static final Logger _logger = LoggerFactory.getLogger(MercuryProducerController.class); + MercuryReporter reporter; + QpidSend sender; + + public MercuryProducerController(TestConfiguration config, MercuryReporter reporter, String prefix) + { + super(config,prefix); + this.reporter = reporter; + System.out.println("Producer ID : " + id); + } + + public void setUp() throws Exception + { + super.setUp(); + sender = new QpidSend(reporter,config, con,dest); + sender.setUp(); + MapMessage m = controllerSession.createMapMessage(); + m.setInt(CODE, OPCode.REGISTER_PRODUCER.ordinal()); + sendMessageToController(m); + } + + public void warmup()throws Exception + { + receiveFromController(OPCode.PRODUCER_STARTWARMUP); + if (_logger.isInfoEnabled()) + { + _logger.info("Producer: " + id + " Warming up......"); + } + sender.send(config.getWarmupCount()); + sender.sendEndMessage(); + } + + public void runSender() throws Exception + { + resetCounters(); + receiveFromController(OPCode.PRODUCER_START); + sender.send(config.getMsgCount()); + } + + public void resetCounters() + { + sender.resetCounters(); + } + + public void sendResults() throws Exception + { + MapMessage msg = controllerSession.createMapMessage(); + msg.setInt(CODE, OPCode.RECEIVED_PRODUCER_STATS.ordinal()); + msg.setDouble(PROD_RATE, reporter.getRate()); + sendMessageToController(msg); + reporter.log(new StringBuilder("Producer rate: "). + append(config.getDecimalFormat().format(reporter.getRate())). + append(" msg/sec"). + toString()); + } + + @Override + public void tearDown() throws Exception + { + sender.tearDown(); + super.tearDown(); + } + + public void run() + { + try + { + setUp(); + warmup(); + boolean nextIteration = true; + while (nextIteration) + { + if(_logger.isInfoEnabled()) + { + _logger.info("=========================================================\n"); + _logger.info("Producer: " + id + " starting a new iteration ......\n"); + } + runSender(); + sendResults(); + nextIteration = continueTest(); + } + tearDown(); + } + catch(Exception e) + { + handleError(e,"Error when running test"); + } + } + + public void startControllerIfNeeded() + { + if (!config.isExternalController()) + { + final MercuryTestController controller = new MercuryTestController(config); + Runnable r = new Runnable() + { + public void run() + { + controller.run(); + } + }; + + Thread t; + try + { + t = Threading.getThreadFactory().createThread(r); + } + catch(Exception e) + { + throw new Error("Error creating controller thread",e); + } + t.start(); + } + } + + public static void main(String[] args) throws Exception + { + TestConfiguration config = new JVMArgConfiguration(); + MercuryReporter reporter= new MercuryReporter(MercuryThroughput.class,System.out,10,true); + String scriptId = (args.length == 1) ? args[0] : ""; + int conCount = config.getConnectionCount(); + final CountDownLatch testCompleted = new CountDownLatch(conCount); + for (int i=0; i < conCount; i++) + { + final MercuryProducerController prod = new MercuryProducerController(config, reporter, scriptId + i); + prod.startControllerIfNeeded(); + Runnable r = new Runnable() + { + public void run() + { + prod.run(); + testCompleted.countDown(); + } + }; + + Thread t; + try + { + t = Threading.getThreadFactory().createThread(r); + } + catch(Exception e) + { + throw new Error("Error creating producer thread",e); + } + t.start(); + } + testCompleted.await(); + reporter.log("Producers have completed the test......"); + } +}
\ No newline at end of file diff --git a/java/tools/src/main/java/org/apache/qpid/tools/PerfTestController.java b/java/tools/src/main/java/org/apache/qpid/tools/MercuryTestController.java index 5fca1fa4bd..8c66a1e44d 100644 --- a/java/tools/src/main/java/org/apache/qpid/tools/PerfTestController.java +++ b/java/tools/src/main/java/org/apache/qpid/tools/MercuryTestController.java @@ -33,6 +33,9 @@ import javax.jms.MessageListener; import javax.jms.MessageProducer; import org.apache.qpid.client.message.AMQPEncodedMapMessage; +import org.apache.qpid.tools.report.Reporter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * The Controller coordinates a test run between a number @@ -62,8 +65,10 @@ import org.apache.qpid.client.message.AMQPEncodedMapMessage; * System throughput is calculated as follows * totalMsgCount/(totalTestTime) */ -public class PerfTestController extends PerfBase implements MessageListener +public class MercuryTestController extends MercuryBase implements MessageListener { + private static final Logger _logger = LoggerFactory.getLogger(MercuryProducerController.class); + enum TestMode { SINGLE_RUN, TIME_BASED }; TestMode testMode = TestMode.SINGLE_RUN; @@ -102,11 +107,13 @@ public class PerfTestController extends PerfBase implements MessageListener private MessageConsumer consumer; private boolean printStdDev = false; - FileWriter writer; + private FileWriter writer; + private Reporter report; - public PerfTestController() + public MercuryTestController(TestConfiguration config) { - super(""); + super(config,""); + consumers = new ConcurrentHashMap<String,MapMessage>(consumerCount); producers = new ConcurrentHashMap<String,MapMessage>(producerCount); @@ -114,7 +121,7 @@ public class PerfTestController extends PerfBase implements MessageListener prodRegistered = new CountDownLatch(producerCount); consReady = new CountDownLatch(consumerCount); prodReady = new CountDownLatch(producerCount); - printStdDev = params.isPrintStdDev(); + printStdDev = config.isPrintStdDev(); testMode = (duration == -1) ? TestMode.SINGLE_RUN : TestMode.TIME_BASED; } @@ -126,28 +133,28 @@ public class PerfTestController extends PerfBase implements MessageListener writer = new FileWriter("stats-csv.log"); } consumer = controllerSession.createConsumer(controllerQueue); - System.out.println("\nController: " + producerCount + " producers are expected"); - System.out.println("Controller: " + consumerCount + " consumers are expected \n"); + report.log("\nController: " + producerCount + " producers are expected"); + report.log("Controller: " + consumerCount + " consumers are expected \n"); consumer.setMessageListener(this); consRegistered.await(); prodRegistered.await(); - System.out.println("\nController: All producers and consumers have registered......\n"); + report.log("\nController: All producers and consumers have registered......\n"); } public void warmup() throws Exception { - System.out.println("Controller initiating warm up sequence......"); + report.log("Controller initiating warm up sequence......"); sendMessageToNodes(OPCode.CONSUMER_STARTWARMUP,consumers.values()); sendMessageToNodes(OPCode.PRODUCER_STARTWARMUP,producers.values()); prodReady.await(); consReady.await(); - System.out.println("\nController : All producers and consumers are ready to start the test......\n"); + report.log("\nController : All producers and consumers are ready to start the test......\n"); } public void startTest() throws Exception { resetCounters(); - System.out.println("\nController Starting test......"); + report.log("\nController Starting test......"); long start = Clock.getTime(); sendMessageToNodes(OPCode.PRODUCER_START,producers.values()); receivedEndMsg.await(); @@ -200,7 +207,7 @@ public class PerfTestController extends PerfBase implements MessageListener } catch(Exception e) { - System.out.println("Error calculating stats from Consumer : " + conStat); + System.err.println("Error calculating stats from Consumer : " + conStat); } @@ -217,7 +224,7 @@ public class PerfTestController extends PerfBase implements MessageListener } catch(Exception e) { - System.out.println("Error calculating stats from Producer : " + conStat); + System.err.println("Error calculating stats from Producer : " + conStat); } avgSystemLatency = totLatency/consumers.size(); @@ -225,56 +232,56 @@ public class PerfTestController extends PerfBase implements MessageListener avgSystemConsRate = totalConsRate/consumers.size(); avgSystemProdRate = totalProdRate/producers.size(); - System.out.println("Total test time : " + totalTestTime + " in " + Clock.getPrecision()); + report.log("Total test time : " + totalTestTime + " in " + Clock.getPrecision()); totalSystemThroughput = (totalMsgCount*Clock.convertToSecs()/totalTestTime); } public void printResults() throws Exception { - System.out.println(new StringBuilder("Total Msgs Received : ").append(totalMsgCount).toString()); - System.out.println(new StringBuilder("System Throughput : "). - append(df.format(totalSystemThroughput)). - append(" msg/sec").toString()); - System.out.println(new StringBuilder("Avg Consumer rate : "). - append(df.format(avgSystemConsRate)). - append(" msg/sec").toString()); - System.out.println(new StringBuilder("Min Consumer rate : "). - append(df.format(minSystemConsRate)). - append(" msg/sec").toString()); - System.out.println(new StringBuilder("Max Consumer rate : "). - append(df.format(maxSystemConsRate)). - append(" msg/sec").toString()); - - System.out.println(new StringBuilder("Avg Producer rate : "). - append(df.format(avgSystemProdRate)). - append(" msg/sec").toString()); - System.out.println(new StringBuilder("Min Producer rate : "). - append(df.format(minSystemProdRate)). - append(" msg/sec").toString()); - System.out.println(new StringBuilder("Max Producer rate : "). - append(df.format(maxSystemProdRate)). - append(" msg/sec").toString()); - - System.out.println(new StringBuilder("Avg System Latency : "). - append(df.format(avgSystemLatency)). - append(" ms").toString()); - System.out.println(new StringBuilder("Min System Latency : "). - append(df.format(minSystemLatency)). - append(" ms").toString()); - System.out.println(new StringBuilder("Max System Latency : "). - append(df.format(maxSystemLatency)). - append(" ms").toString()); + report.log(new StringBuilder("Total Msgs Received : ").append(totalMsgCount).toString()); + report.log(new StringBuilder("System Throughput : "). + append(config.getDecimalFormat().format(totalSystemThroughput)). + append(" msg/sec").toString()); + report.log(new StringBuilder("Avg Consumer rate : "). + append(config.getDecimalFormat().format(avgSystemConsRate)). + append(" msg/sec").toString()); + report.log(new StringBuilder("Min Consumer rate : "). + append(config.getDecimalFormat().format(minSystemConsRate)). + append(" msg/sec").toString()); + report.log(new StringBuilder("Max Consumer rate : "). + append(config.getDecimalFormat().format(maxSystemConsRate)). + append(" msg/sec").toString()); + + report.log(new StringBuilder("Avg Producer rate : "). + append(config.getDecimalFormat().format(avgSystemProdRate)). + append(" msg/sec").toString()); + report.log(new StringBuilder("Min Producer rate : "). + append(config.getDecimalFormat().format(minSystemProdRate)). + append(" msg/sec").toString()); + report.log(new StringBuilder("Max Producer rate : "). + append(config.getDecimalFormat().format(maxSystemProdRate)). + append(" msg/sec").toString()); + + report.log(new StringBuilder("Avg System Latency : "). + append(config.getDecimalFormat().format(avgSystemLatency)). + append(" ms").toString()); + report.log(new StringBuilder("Min System Latency : "). + append(config.getDecimalFormat().format(minSystemLatency)). + append(" ms").toString()); + report.log(new StringBuilder("Max System Latency : "). + append(config.getDecimalFormat().format(maxSystemLatency)). + append(" ms").toString()); if (printStdDev) { - System.out.println(new StringBuilder("Avg System Std Dev : "). - append(avgSystemLatencyStdDev)); + report.log(new StringBuilder("Avg System Std Dev : "). + append(avgSystemLatencyStdDev).toString()); } } private synchronized void sendMessageToNodes(OPCode code,Collection<MapMessage> nodes) throws Exception { - System.out.println("\nController: Sending code " + code); + report.log("\nController: Sending code " + code); MessageProducer tmpProd = controllerSession.createProducer(null); MapMessage msg = controllerSession.createMapMessage(); msg.setInt(CODE, code.ordinal()); @@ -282,11 +289,11 @@ public class PerfTestController extends PerfBase implements MessageListener { if (node.getString(REPLY_ADDR) == null) { - System.out.println("REPLY_ADDR is null " + node); + report.log("REPLY_ADDR is null " + node); } else { - System.out.println("Controller: Sending " + code + " to " + node.getString(REPLY_ADDR)); + report.log("Controller: Sending " + code + " to " + node.getString(REPLY_ADDR)); } tmpProd.send(controllerSession.createQueue(node.getString(REPLY_ADDR)), msg); } @@ -299,16 +306,16 @@ public class PerfTestController extends PerfBase implements MessageListener MapMessage m = (MapMessage)msg; OPCode code = OPCode.values()[m.getInt(CODE)]; - System.out.println("\n---------Controller Received Code : " + code); - System.out.println("---------Data : " + ((AMQPEncodedMapMessage)m).getMap()); + report.log("\n---------Controller Received Code : " + code); + report.log("---------Data : " + ((AMQPEncodedMapMessage)m).getMap()); switch (code) { case REGISTER_CONSUMER : if (consRegistered.getCount() == 0) { - System.out.println("Warning : Expected number of consumers have already registered," + - "ignoring extra consumer"); + report.log("Warning : Expected number of consumers have already registered," + + "ignoring extra consumer"); break; } consumers.put(m.getString(ID),m); @@ -318,8 +325,8 @@ public class PerfTestController extends PerfBase implements MessageListener case REGISTER_PRODUCER : if (prodRegistered.getCount() == 0) { - System.out.println("Warning : Expected number of producers have already registered," + - "ignoring extra producer"); + report.log("Warning : Expected number of producers have already registered," + + "ignoring extra producer"); break; } producers.put(m.getString(ID),m); @@ -403,7 +410,7 @@ public class PerfTestController extends PerfBase implements MessageListener @Override public void tearDown() throws Exception { - System.out.println("Controller: Completed the test......\n"); + report.log("Controller: Completed the test......\n"); if (testMode == TestMode.TIME_BASED) { writer.close(); @@ -416,16 +423,16 @@ public class PerfTestController extends PerfBase implements MessageListener public void writeStatsToFile() throws Exception { writer.append(String.valueOf(totalMsgCount)).append(","); - writer.append(df.format(totalSystemThroughput)).append(","); - writer.append(df.format(avgSystemConsRate)).append(","); - writer.append(df.format(minSystemConsRate)).append(","); - writer.append(df.format(maxSystemConsRate)).append(","); - writer.append(df.format(avgSystemProdRate)).append(","); - writer.append(df.format(minSystemProdRate)).append(","); - writer.append(df.format(maxSystemProdRate)).append(","); - writer.append(df.format(avgSystemLatency)).append(","); - writer.append(df.format(minSystemLatency)).append(","); - writer.append(df.format(maxSystemLatency)); + writer.append(config.getDecimalFormat().format(totalSystemThroughput)).append(","); + writer.append(config.getDecimalFormat().format(avgSystemConsRate)).append(","); + writer.append(config.getDecimalFormat().format(minSystemConsRate)).append(","); + writer.append(config.getDecimalFormat().format(maxSystemConsRate)).append(","); + writer.append(config.getDecimalFormat().format(avgSystemProdRate)).append(","); + writer.append(config.getDecimalFormat().format(minSystemProdRate)).append(","); + writer.append(config.getDecimalFormat().format(maxSystemProdRate)).append(","); + writer.append(config.getDecimalFormat().format(avgSystemLatency)).append(","); + writer.append(config.getDecimalFormat().format(minSystemLatency)).append(","); + writer.append(config.getDecimalFormat().format(maxSystemLatency)); if (printStdDev) { writer.append(",").append(String.valueOf(avgSystemLatencyStdDev)); @@ -436,7 +443,8 @@ public class PerfTestController extends PerfBase implements MessageListener public static void main(String[] args) { - PerfTestController controller = new PerfTestController(); + TestConfiguration config = new JVMArgConfiguration(); + MercuryTestController controller = new MercuryTestController(config); controller.run(); } } diff --git a/java/tools/src/main/java/org/apache/qpid/tools/PerfConsumer.java b/java/tools/src/main/java/org/apache/qpid/tools/PerfConsumer.java deleted file mode 100644 index b63892bb51..0000000000 --- a/java/tools/src/main/java/org/apache/qpid/tools/PerfConsumer.java +++ /dev/null @@ -1,325 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.tools; - -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.CountDownLatch; - -import javax.jms.MapMessage; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import javax.jms.TextMessage; - -import org.apache.qpid.client.AMQDestination; -import org.apache.qpid.thread.Threading; - -/** - * PerfConsumer will receive x no of messages in warmup mode. - * Once it receives the Start message it will then signal the PerfProducer. - * It will start recording stats from the first message it receives after - * the warmup mode is done. - * - * The following calculations are done. - * The important numbers to look at is - * a) Avg Latency - * b) System throughput. - * - * Latency. - * ========= - * Currently this test is written with the assumption that either - * a) The Perf Producer and Consumer are on the same machine - * b) They are on separate machines that have their time synced via a Time Server - * - * In order to calculate latency the producer inserts a timestamp - * when the message is sent. The consumer will note the current time the message is - * received and will calculate the latency as follows - * latency = rcvdTime - msg.getJMSTimestamp() - * - * Through out the test it will keep track of the max and min latency to show the - * variance in latencies. - * - * Avg latency is measured by adding all latencies and dividing by the total msgs. - * - * Throughput - * =========== - * Consumer rate is calculated as - * rcvdMsgCount/(rcvdTime - startTime) - * - * Note that the testStartTime referes to when the producer sent the first message - * and startTime is when the consumer first received a message. - * - * rcvdTime keeps track of when the last message is received. - * - * All throughput rates are given as msg/sec so the rates are multiplied by 1000. - * - */ - -public class PerfConsumer extends PerfBase implements MessageListener -{ - MessageConsumer consumer; - long maxLatency = 0; - long minLatency = Long.MAX_VALUE; - long totalLatency = 0; // to calculate avg latency. - int rcvdMsgCount = 0; - long startTime = 0; // to measure consumer throughput - long rcvdTime = 0; - boolean transacted = false; - int transSize = 0; - - boolean printStdDev = false; - List<Long> sample; - - final Object lock = new Object(); - - public PerfConsumer(String prefix) - { - super(prefix); - System.out.println("Consumer ID : " + id); - } - - public void setUp() throws Exception - { - super.setUp(); - consumer = session.createConsumer(dest); - System.out.println("Consumer: " + id + " Receiving messages from : " + ((AMQDestination)dest).getQueueName() + "\n"); - - // Storing the following two for efficiency - transacted = params.isTransacted(); - transSize = params.getTransactionSize(); - printStdDev = params.isPrintStdDev(); - MapMessage m = controllerSession.createMapMessage(); - m.setInt(CODE, OPCode.REGISTER_CONSUMER.ordinal()); - sendMessageToController(m); - } - - public void warmup()throws Exception - { - receiveFromController(OPCode.CONSUMER_STARTWARMUP); - Message msg = consumer.receive(); - // This is to ensure we drain the queue before we start the actual test. - while ( msg != null) - { - if (msg.getBooleanProperty("End") == true) - { - // It's more realistic for the consumer to signal this. - MapMessage m = controllerSession.createMapMessage(); - m.setInt(CODE, OPCode.PRODUCER_READY.ordinal()); - sendMessageToController(m); - } - msg = consumer.receive(1000); - } - - if (params.isTransacted()) - { - session.commit(); - } - - MapMessage m = controllerSession.createMapMessage(); - m.setInt(CODE, OPCode.CONSUMER_READY.ordinal()); - sendMessageToController(m); - consumer.setMessageListener(this); - } - - public void startTest() throws Exception - { - System.out.println("Consumer: " + id + " Starting test......" + "\n"); - resetCounters(); - } - - public void resetCounters() - { - rcvdMsgCount = 0; - maxLatency = 0; - minLatency = Long.MAX_VALUE; - totalLatency = 0; - if (printStdDev) - { - sample = null; - sample = new ArrayList<Long>(params.getMsgCount()); - } - } - - public void sendResults() throws Exception - { - receiveFromController(OPCode.CONSUMER_STOP); - - double avgLatency = (double)totalLatency/(double)rcvdMsgCount; - double consRate = (double)rcvdMsgCount*Clock.convertToSecs()/(double)(rcvdTime - startTime); - double stdDev = 0.0; - if (printStdDev) - { - stdDev = calculateStdDev(avgLatency); - } - MapMessage m = controllerSession.createMapMessage(); - m.setInt(CODE, OPCode.RECEIVED_CONSUMER_STATS.ordinal()); - m.setDouble(AVG_LATENCY, avgLatency/Clock.convertToMiliSecs()); - m.setDouble(MIN_LATENCY,minLatency/Clock.convertToMiliSecs()); - m.setDouble(MAX_LATENCY,maxLatency/Clock.convertToMiliSecs()); - m.setDouble(STD_DEV, stdDev/Clock.convertToMiliSecs()); - m.setDouble(CONS_RATE, consRate); - m.setLong(MSG_COUNT, rcvdMsgCount); - sendMessageToController(m); - - System.out.println(new StringBuilder("Total Msgs Received : ").append(rcvdMsgCount).toString()); - System.out.println(new StringBuilder("Consumer rate : "). - append(df.format(consRate)). - append(" msg/sec").toString()); - System.out.println(new StringBuilder("Avg Latency : "). - append(df.format(avgLatency/Clock.convertToMiliSecs())). - append(" ms").toString()); - System.out.println(new StringBuilder("Min Latency : "). - append(df.format(minLatency/Clock.convertToMiliSecs())). - append(" ms").toString()); - System.out.println(new StringBuilder("Max Latency : "). - append(df.format(maxLatency/Clock.convertToMiliSecs())). - append(" ms").toString()); - if (printStdDev) - { - System.out.println(new StringBuilder("Std Dev : "). - append(stdDev/Clock.convertToMiliSecs()).toString()); - } - } - - public double calculateStdDev(double mean) - { - double v = 0; - for (double latency: sample) - { - v = v + Math.pow((latency-mean), 2); - } - v = v/sample.size(); - return Math.round(Math.sqrt(v)); - } - - public void onMessage(Message msg) - { - try - { - // To figure out the decoding overhead of text - if (msgType == MessageType.TEXT) - { - ((TextMessage)msg).getText(); - } - - if (msg.getBooleanProperty("End")) - { - MapMessage m = controllerSession.createMapMessage(); - m.setInt(CODE, OPCode.RECEIVED_END_MSG.ordinal()); - sendMessageToController(m); - } - else - { - rcvdTime = Clock.getTime(); - rcvdMsgCount ++; - - if (rcvdMsgCount == 1) - { - startTime = rcvdTime; - } - - if (transacted && (rcvdMsgCount % transSize == 0)) - { - session.commit(); - } - - long latency = rcvdTime - msg.getLongProperty(TIMESTAMP); - maxLatency = Math.max(maxLatency, latency); - minLatency = Math.min(minLatency, latency); - totalLatency = totalLatency + latency; - if (printStdDev) - { - sample.add(latency); - } - } - - } - catch(Exception e) - { - handleError(e,"Error when receiving messages"); - } - - } - - public void run() - { - try - { - setUp(); - warmup(); - boolean nextIteration = true; - while (nextIteration) - { - System.out.println("=========================================================\n"); - System.out.println("Consumer: " + id + " starting a new iteration ......\n"); - startTest(); - sendResults(); - nextIteration = continueTest(); - } - tearDown(); - } - catch(Exception e) - { - handleError(e,"Error when running test"); - } - } - - @Override - public void tearDown() throws Exception - { - super.tearDown(); - } - - public static void main(String[] args) throws InterruptedException - { - String scriptId = (args.length == 1) ? args[0] : ""; - int conCount = Integer.getInteger("con_count",1); - final CountDownLatch testCompleted = new CountDownLatch(conCount); - for (int i=0; i < conCount; i++) - { - - final PerfConsumer cons = new PerfConsumer(scriptId + i); - Runnable r = new Runnable() - { - public void run() - { - cons.run(); - testCompleted.countDown(); - } - }; - - Thread t; - try - { - t = Threading.getThreadFactory().createThread(r); - } - catch(Exception e) - { - throw new Error("Error creating consumer thread",e); - } - t.start(); - - } - testCompleted.await(); - System.out.println("Consumers have completed the test......\n"); - } -}
\ No newline at end of file diff --git a/java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java b/java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java deleted file mode 100644 index ac6129ab68..0000000000 --- a/java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java +++ /dev/null @@ -1,358 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.tools; - -import java.util.ArrayList; -import java.util.List; -import java.util.Random; -import java.util.concurrent.CountDownLatch; - -import javax.jms.BytesMessage; -import javax.jms.DeliveryMode; -import javax.jms.MapMessage; -import javax.jms.Message; -import javax.jms.MessageProducer; - -import org.apache.qpid.client.AMQDestination; -import org.apache.qpid.thread.Threading; - -/** - * PerfProducer sends an x no of messages in warmup mode and wait for a confirmation - * from the consumer that it has successfully consumed them and ready to start the - * test. It will start sending y no of messages and each message will contain a time - * stamp. This will be used at the receiving end to measure the latency. - * - * This is done with the assumption that both consumer and producer are running on - * the same machine or different machines which have time synced using a time server. - * - * This test also calculates the producer rate as follows. - * rate = msg_count/(time_before_sending_msgs - time_after_sending_msgs) - * - * All throughput rates are given as msg/sec so the rates are multiplied by 1000. - * - * Rajith - Producer rate is not an accurate perf metric IMO. - * It is heavily inlfuenced by any in memory buffering. - * System throughput and latencies calculated by the PerfConsumer are more realistic - * numbers. - * - * Answer by rajith : I agree about in memory buffering affecting rates. But Based on test runs - * I have done so far, it seems quite useful to compute the producer rate as it gives an - * indication of how the system behaves. For ex if there is a gap between producer and consumer rates - * you could clearly see the higher latencies and when producer and consumer rates are very close, - * latency is good. - * - */ -public class PerfProducer extends PerfBase -{ - private static long SEC = 60000; - - MessageProducer producer; - Message msg; - Object payload; - List<Object> payloads; - boolean cacheMsg = false; - boolean randomMsgSize = false; - boolean durable = false; - Random random; - int msgSizeRange = 1024; - boolean rateLimitProducer = false; - double rateFactor = 0.4; - double rate = 0.0; - - public PerfProducer(String prefix) - { - super(prefix); - System.out.println("Producer ID : " + id); - } - - public void setUp() throws Exception - { - super.setUp(); - durable = params.isDurable(); - rateLimitProducer = params.getRate() > 0 ? true : false; - if (rateLimitProducer) - { - System.out.println("The test will attempt to limit the producer to " + params.getRate() + " msg/sec"); - } - - // if message caching is enabled we pre create the message - // else we pre create the payload - if (params.isCacheMessage()) - { - cacheMsg = true; - msg = createMessage(createPayload(params.getMsgSize())); - msg.setJMSDeliveryMode(durable? - DeliveryMode.PERSISTENT : - DeliveryMode.NON_PERSISTENT - ); - } - else if (params.isRandomMsgSize()) - { - random = new Random(20080921); - randomMsgSize = true; - msgSizeRange = params.getMsgSize(); - payloads = new ArrayList<Object>(msgSizeRange); - - for (int i=0; i < msgSizeRange; i++) - { - payloads.add(createPayload(i)); - } - } - else - { - payload = createPayload(params.getMsgSize()); - } - - producer = session.createProducer(dest); - System.out.println("Producer: " + id + " Sending messages to: " + ((AMQDestination)dest).getQueueName()); - producer.setDisableMessageID(params.isDisableMessageID()); - producer.setDisableMessageTimestamp(params.isDisableTimestamp()); - - MapMessage m = controllerSession.createMapMessage(); - m.setInt(CODE, OPCode.REGISTER_PRODUCER.ordinal()); - sendMessageToController(m); - } - - Object createPayload(int size) - { - if (msgType == MessageType.TEXT) - { - return MessageFactory.createMessagePayload(size); - } - else - { - return MessageFactory.createMessagePayload(size).getBytes(); - } - } - - Message createMessage(Object payload) throws Exception - { - if (msgType == MessageType.TEXT) - { - return session.createTextMessage((String)payload); - } - else - { - BytesMessage m = session.createBytesMessage(); - m.writeBytes((byte[])payload); - return m; - } - } - - protected Message getNextMessage() throws Exception - { - if (cacheMsg) - { - return msg; - } - else - { - Message m; - - if (!randomMsgSize) - { - m = createMessage(payload); - } - else - { - m = createMessage(payloads.get(random.nextInt(msgSizeRange))); - } - m.setJMSDeliveryMode(durable? - DeliveryMode.PERSISTENT : - DeliveryMode.NON_PERSISTENT - ); - return m; - } - } - - public void warmup()throws Exception - { - receiveFromController(OPCode.PRODUCER_STARTWARMUP); - System.out.println("Producer: " + id + " Warming up......"); - - for (int i=0; i < params.getWarmupCount() -1; i++) - { - producer.send(getNextMessage()); - } - sendEndMessage(); - - if (params.isTransacted()) - { - session.commit(); - } - } - - public void startTest() throws Exception - { - resetCounters(); - receiveFromController(OPCode.PRODUCER_START); - int count = params.getMsgCount(); - boolean transacted = params.isTransacted(); - int tranSize = params.getTransactionSize(); - - long limit = (long)(params.getRate() * rateFactor); // in msecs - long timeLimit = (long)(SEC * rateFactor); // in msecs - - long start = Clock.getTime(); // defaults to nano secs - long interval = start; - for(int i=0; i < count; i++ ) - { - Message msg = getNextMessage(); - msg.setLongProperty(TIMESTAMP, Clock.getTime()); - producer.send(msg); - if ( transacted && ((i+1) % tranSize == 0)) - { - session.commit(); - } - - if (rateLimitProducer && i%limit == 0) - { - long elapsed = (Clock.getTime() - interval)*Clock.convertToMiliSecs(); // in msecs - if (elapsed < timeLimit) - { - Thread.sleep(elapsed); - } - interval = Clock.getTime(); - - } - } - sendEndMessage(); - if ( transacted) - { - session.commit(); - } - long time = Clock.getTime() - start; - rate = (double)count*Clock.convertToSecs()/(double)time; - System.out.println(new StringBuilder("Producer rate: "). - append(df.format(rate)). - append(" msg/sec"). - toString()); - } - - public void resetCounters() - { - - } - - public void sendEndMessage() throws Exception - { - Message msg = session.createMessage(); - msg.setBooleanProperty("End", true); - producer.send(msg); - } - - public void sendResults() throws Exception - { - MapMessage msg = controllerSession.createMapMessage(); - msg.setInt(CODE, OPCode.RECEIVED_PRODUCER_STATS.ordinal()); - msg.setDouble(PROD_RATE, rate); - sendMessageToController(msg); - } - - @Override - public void tearDown() throws Exception - { - super.tearDown(); - } - - public void run() - { - try - { - setUp(); - warmup(); - boolean nextIteration = true; - while (nextIteration) - { - System.out.println("=========================================================\n"); - System.out.println("Producer: " + id + " starting a new iteration ......\n"); - startTest(); - sendResults(); - nextIteration = continueTest(); - } - tearDown(); - } - catch(Exception e) - { - handleError(e,"Error when running test"); - } - } - - public void startControllerIfNeeded() - { - if (!params.isExternalController()) - { - final PerfTestController controller = new PerfTestController(); - Runnable r = new Runnable() - { - public void run() - { - controller.run(); - } - }; - - Thread t; - try - { - t = Threading.getThreadFactory().createThread(r); - } - catch(Exception e) - { - throw new Error("Error creating controller thread",e); - } - t.start(); - } - } - - - public static void main(String[] args) throws InterruptedException - { - String scriptId = (args.length == 1) ? args[0] : ""; - int conCount = Integer.getInteger("con_count",1); - final CountDownLatch testCompleted = new CountDownLatch(conCount); - for (int i=0; i < conCount; i++) - { - final PerfProducer prod = new PerfProducer(scriptId + i); - prod.startControllerIfNeeded(); - Runnable r = new Runnable() - { - public void run() - { - prod.run(); - testCompleted.countDown(); - } - }; - - Thread t; - try - { - t = Threading.getThreadFactory().createThread(r); - } - catch(Exception e) - { - throw new Error("Error creating producer thread",e); - } - t.start(); - } - testCompleted.await(); - System.out.println("Producers have completed the test......"); - } -}
\ No newline at end of file diff --git a/java/tools/src/main/java/org/apache/qpid/tools/QpidReceive.java b/java/tools/src/main/java/org/apache/qpid/tools/QpidReceive.java new file mode 100644 index 0000000000..02f011f1b9 --- /dev/null +++ b/java/tools/src/main/java/org/apache/qpid/tools/QpidReceive.java @@ -0,0 +1,181 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.tools; + +import java.util.UUID; +import java.util.concurrent.CountDownLatch; + +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.Session; +import javax.jms.TextMessage; + +import org.apache.qpid.client.AMQDestination; +import org.apache.qpid.tools.TestConfiguration.MessageType; +import org.apache.qpid.tools.report.BasicReporter; +import org.apache.qpid.tools.report.Reporter; +import org.apache.qpid.tools.report.Statistics.Throughput; +import org.apache.qpid.tools.report.Statistics.ThroughputAndLatency; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class QpidReceive implements MessageListener +{ + private static final Logger _logger = LoggerFactory.getLogger(QpidSend.class); + private final CountDownLatch testCompleted = new CountDownLatch(1); + + private Connection con; + private Session session; + private Destination dest; + private MessageConsumer consumer; + private boolean transacted = false; + private boolean isRollback = false; + private int txSize = 0; + private int rollbackFrequency = 0; + private int ackFrequency = 0; + private int expected = 0; + private int received = 0; + private Reporter report; + private TestConfiguration config; + + public QpidReceive(Reporter report, TestConfiguration config, Connection con, Destination dest) + { + this(report,config, con, dest, UUID.randomUUID().toString()); + } + + public QpidReceive(Reporter report, TestConfiguration config, Connection con, Destination dest, String prefix) + { + //System.out.println("Producer ID : " + id); + this.report = report; + this.config = config; + this.con = con; + this.dest = dest; + } + + public void setUp() throws Exception + { + if (config.isTransacted()) + { + session = con.createSession(true, Session.SESSION_TRANSACTED); + } + else if (config.getAckFrequency() > 0) + { + session = con.createSession(false, Session.DUPS_OK_ACKNOWLEDGE); + } + else + { + session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); + } + consumer = session.createConsumer(dest); + consumer.setMessageListener(this); + System.out.println("Consumer: " + /*id +*/ " Receiving messages from : " + ((AMQDestination)dest).getAddressName() + "\n"); + + transacted = config.isTransacted(); + txSize = config.getTransactionSize(); + isRollback = config.getRollbackFrequency() > 0; + rollbackFrequency = config.getRollbackFrequency(); + ackFrequency = config.getAckFrequency(); + } + + public void resetCounters() + { + received = 0; + expected = 0; + report.clear(); + } + + public void onMessage(Message msg) + { + try + { + if (msg instanceof TextMessage && + TestConfiguration.EOS.equals(((TextMessage)msg).getText())) + { + testCompleted.countDown(); + return; + } + + received++; + report.message(msg); + + if (transacted && (received % txSize == 0)) + { + if (isRollback && (received % rollbackFrequency == 0)) + { + session.rollback(); + } + else + { + session.commit(); + } + } + else if (ackFrequency > 0) + { + msg.acknowledge(); + } + + if (expected >= received) + { + testCompleted.countDown(); + } + + } + catch(Exception e) + { + _logger.error("Error when receiving messages",e); + } + + } + + public void waitforCompletion(int expected) throws Exception + { + this.expected = expected; + testCompleted.await(); + } + + public void tearDown() throws Exception + { + session.close(); + } + + public static void main(String[] args) throws Exception + { + TestConfiguration config = new JVMArgConfiguration(); + Reporter reporter = new BasicReporter(config.isReportLatency()? ThroughputAndLatency.class : Throughput.class, + System.out, + config.reportEvery(), + config.isReportHeader() + ); + Destination dest = AMQDestination.createDestination(config.getAddress()); + QpidReceive receiver = new QpidReceive(reporter,config, config.createConnection(),dest); + receiver.setUp(); + receiver.waitforCompletion(config.getMsgCount()); + if (config.isReportTotal()) + { + reporter.report(); + } + receiver.tearDown(); + } + +} diff --git a/java/tools/src/main/java/org/apache/qpid/tools/QpidSend.java b/java/tools/src/main/java/org/apache/qpid/tools/QpidSend.java new file mode 100644 index 0000000000..c058b83d41 --- /dev/null +++ b/java/tools/src/main/java/org/apache/qpid/tools/QpidSend.java @@ -0,0 +1,291 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.tools; + +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import java.util.UUID; + +import javax.jms.BytesMessage; +import javax.jms.Connection; +import javax.jms.DeliveryMode; +import javax.jms.Destination; +import javax.jms.Message; +import javax.jms.MessageProducer; +import javax.jms.Session; + +import org.apache.qpid.client.AMQDestination; +import org.apache.qpid.tools.TestConfiguration.MessageType; +import org.apache.qpid.tools.report.BasicReporter; +import org.apache.qpid.tools.report.Reporter; +import org.apache.qpid.tools.report.Statistics.Throughput; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class QpidSend +{ + private Connection con; + private Session session; + private Destination dest; + private MessageProducer producer; + private MessageType msgType; + private Message msg; + private Object payload; + private List<Object> payloads; + private boolean cacheMsg = false; + private boolean randomMsgSize = false; + private boolean durable = false; + private Random random; + private int msgSizeRange = 1024; + private int totalMsgCount = 0; + private boolean rateLimitProducer = false; + private boolean transacted = false; + private int txSize = 0; + + private static final Logger _logger = LoggerFactory.getLogger(QpidSend.class); + Reporter report; + TestConfiguration config; + + public QpidSend(Reporter report, TestConfiguration config, Connection con, Destination dest) + { + this(report,config, con, dest, UUID.randomUUID().toString()); + } + + public QpidSend(Reporter report, TestConfiguration config, Connection con, Destination dest, String prefix) + { + //System.out.println("Producer ID : " + id); + this.report = report; + this.config = config; + this.con = con; + this.dest = dest; + } + + public void setUp() throws Exception + { + if (config.isTransacted()) + { + session = con.createSession(true, Session.SESSION_TRANSACTED); + } + else + { + session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); + } + + durable = config.isDurable(); + rateLimitProducer = config.getSendRate() > 0 ? true : false; + if (_logger.isDebugEnabled() && rateLimitProducer) + { + System.out.println("The test will attempt to limit the producer to " + config.getSendRate() + " msg/sec"); + } + + transacted = config.isTransacted(); + txSize = config.getTransactionSize(); + + msgType = MessageType.getType(config.getMessageType()); + // if message caching is enabled we pre create the message + // else we pre create the payload + if (config.isCacheMessage()) + { + cacheMsg = true; + msg = createMessage(createPayload(config.getMsgSize())); + msg.setJMSDeliveryMode(durable? + DeliveryMode.PERSISTENT : + DeliveryMode.NON_PERSISTENT + ); + } + else if (config.isRandomMsgSize()) + { + random = new Random(20080921); + randomMsgSize = true; + msgSizeRange = config.getMsgSize(); + payloads = new ArrayList<Object>(msgSizeRange); + + for (int i=0; i < msgSizeRange; i++) + { + payloads.add(createPayload(i)); + } + } + else + { + payload = createPayload(config.getMsgSize()); + } + + producer = session.createProducer(dest); + if (_logger.isDebugEnabled()) + { + System.out.println("Producer: " + /*id +*/ " Sending messages to: " + ((AMQDestination)dest).getAddressName()); + } + producer.setDisableMessageID(config.isDisableMessageID()); + producer.setDisableMessageTimestamp(config.isDisableTimestamp()); + } + + Object createPayload(int size) + { + if (msgType == MessageType.TEXT) + { + return MessageFactory.createMessagePayload(size); + } + else + { + return MessageFactory.createMessagePayload(size).getBytes(); + } + } + + Message createMessage(Object payload) throws Exception + { + if (msgType == MessageType.TEXT) + { + return session.createTextMessage((String)payload); + } + else + { + BytesMessage m = session.createBytesMessage(); + m.writeBytes((byte[])payload); + return m; + } + } + + protected Message getNextMessage() throws Exception + { + if (cacheMsg) + { + return msg; + } + else + { + Message m; + + if (!randomMsgSize) + { + m = createMessage(payload); + } + else + { + m = createMessage(payloads.get(random.nextInt(msgSizeRange))); + } + m.setJMSDeliveryMode(durable? + DeliveryMode.PERSISTENT : + DeliveryMode.NON_PERSISTENT + ); + return m; + } + } + + public void commit() throws Exception + { + session.commit(); + } + + public void send() throws Exception + { + send(config.getMsgCount()); + } + + public void send(int count) throws Exception + { + int sendRate = config.getSendRate(); + if (rateLimitProducer) + { + int iterations = count/sendRate; + int remainder = count%sendRate; + for (int i=0; i < iterations; i++) + { + long iterationStart = Clock.getTime(); + sendMessages(sendRate); + long elapsed = (Clock.getTime() - iterationStart)*Clock.convertToMiliSecs(); + long diff = Clock.SEC - elapsed; + if (diff > 0) + { + // We have sent more messages in a sec than specified by the rate. + Thread.sleep(diff); + } + } + sendMessages(remainder); + } + else + { + sendMessages(count); + } + } + + private void sendMessages(int count) throws Exception + { + boolean isTimestamp = config.isReportLatency(); + for(int i=0; i < count; i++ ) + { + Message msg = getNextMessage(); + if (isTimestamp) + { + msg.setLongProperty(TestConfiguration.TIMESTAMP, Clock.getTime()); + } + producer.send(msg); + report.message(msg); + totalMsgCount++; + + if ( transacted && ((totalMsgCount) % txSize == 0)) + { + session.commit(); + } + } + } + + public void resetCounters() + { + totalMsgCount = 0; + report.clear(); + } + + public void sendEndMessage() throws Exception + { + Message msg = session.createMessage(); + msg.setBooleanProperty(TestConfiguration.EOS, true); + producer.send(msg); + } + + public void tearDown() throws Exception + { + session.close(); + } + + public static void main(String[] args) throws Exception + { + TestConfiguration config = new JVMArgConfiguration(); + Reporter reporter = new BasicReporter(Throughput.class, + System.out, + config.reportEvery(), + config.isReportHeader() + ); + Destination dest = AMQDestination.createDestination(config.getAddress()); + QpidSend sender = new QpidSend(reporter,config, config.createConnection(),dest); + sender.setUp(); + sender.send(); + if (config.getSendEOS() > 0) + { + sender.sendEndMessage(); + } + if (config.isReportTotal()) + { + reporter.report(); + } + sender.tearDown(); + } +} diff --git a/java/tools/src/main/java/org/apache/qpid/tools/TestConfiguration.java b/java/tools/src/main/java/org/apache/qpid/tools/TestConfiguration.java new file mode 100644 index 0000000000..7f7df0e5e6 --- /dev/null +++ b/java/tools/src/main/java/org/apache/qpid/tools/TestConfiguration.java @@ -0,0 +1,126 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.tools; + +import java.text.DecimalFormat; + +import javax.jms.Connection; + +public interface TestConfiguration +{ + enum MessageType { + BYTES, TEXT, MAP, OBJECT; + + public static MessageType getType(String s) throws Exception + { + if ("text".equalsIgnoreCase(s)) + { + return TEXT; + } + else if ("bytes".equalsIgnoreCase(s)) + { + return BYTES; + } + /*else if ("map".equalsIgnoreCase(s)) + { + return MAP; + } + else if ("object".equalsIgnoreCase(s)) + { + return OBJECT; + }*/ + else + { + throw new Exception("Unsupported message type"); + } + } + }; + + public final static String TIMESTAMP = "ts"; + + public final static String EOS = "eos"; + + public final static String SEQUENCE_NUMBER = "sn"; + + public String getUrl(); + + public String getHost(); + + public int getPort(); + + public String getAddress(); + + public int getAckMode(); + + public int getMsgCount(); + + public int getMsgSize(); + + public int getRandomMsgSizeStartFrom(); + + public boolean isDurable(); + + public boolean isTransacted(); + + public int getTransactionSize(); + + public int getWarmupCount(); + + public boolean isCacheMessage(); + + public boolean isDisableMessageID(); + + public boolean isDisableTimestamp(); + + public boolean isRandomMsgSize(); + + public String getMessageType(); + + public boolean isPrintStdDev(); + + public int getSendRate(); + + public boolean isExternalController(); + + public boolean isUseUniqueDests(); + + public int getAckFrequency(); + + public Connection createConnection() throws Exception; + + public DecimalFormat getDecimalFormat(); + + public int reportEvery(); + + public boolean isReportTotal(); + + public boolean isReportHeader(); + + public boolean isReportLatency(); + + public int getSendEOS(); + + public int getConnectionCount(); + + public int getRollbackFrequency(); + + public boolean isPrintHeaders(); +}
\ No newline at end of file diff --git a/java/tools/src/main/java/org/apache/qpid/tools/TestParams.java b/java/tools/src/main/java/org/apache/qpid/tools/TestParams.java deleted file mode 100644 index d73be0181b..0000000000 --- a/java/tools/src/main/java/org/apache/qpid/tools/TestParams.java +++ /dev/null @@ -1,214 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.tools; - -import javax.jms.Session; - -public class TestParams -{ - /* - * By default the connection URL is used. - * This allows a user to easily specify a fully fledged URL any given property. - * Ex. SSL parameters - * - * By providing a host & port allows a user to simply override the URL. - * This allows to create multiple clients in test scripts easily, - * without having to deal with the long URL format. - */ - private String url = "amqp://guest:guest@clientid/testpath?brokerlist='tcp://localhost:5672'"; - - private String host = ""; - - private int port = -1; - - private String address = "queue; {create : always}"; - - private int msg_size = 1024; - - private int random_msg_size_start_from = 1; - - private boolean cacheMessage = false; - - private boolean disableMessageID = false; - - private boolean disableTimestamp = false; - - private boolean durable = false; - - private boolean transacted = false; - - private int transaction_size = 1000; - - private int ack_mode = Session.AUTO_ACKNOWLEDGE; - - private int msg_count = 10; - - private int warmup_count = 1; - - private boolean random_msg_size = false; - - private String msgType = "bytes"; - - private boolean printStdDev = false; - - private long rate = -1; - - private boolean externalController = false; - - private boolean useUniqueDest = false; // useful when using multiple connections. - - public TestParams() - { - - url = System.getProperty("url",url); - host = System.getProperty("host",""); - port = Integer.getInteger("port", -1); - address = System.getProperty("address",address); - - msg_size = Integer.getInteger("msg_size", 1024); - cacheMessage = Boolean.getBoolean("cache_msg"); - disableMessageID = Boolean.getBoolean("disableMessageID"); - disableTimestamp = Boolean.getBoolean("disableTimestamp"); - durable = Boolean.getBoolean("durable"); - transacted = Boolean.getBoolean("transacted"); - transaction_size = Integer.getInteger("trans_size",1000); - ack_mode = Integer.getInteger("ack_mode",Session.AUTO_ACKNOWLEDGE); - msg_count = Integer.getInteger("msg_count",msg_count); - warmup_count = Integer.getInteger("warmup_count",warmup_count); - random_msg_size = Boolean.getBoolean("random_msg_size"); - msgType = System.getProperty("msg_type","bytes"); - printStdDev = Boolean.getBoolean("print_std_dev"); - rate = Long.getLong("rate",-1); - externalController = Boolean.getBoolean("ext_controller"); - useUniqueDest = Boolean.getBoolean("use_unique_dest"); - random_msg_size_start_from = Integer.getInteger("random_msg_size_start_from", 1); - } - - public String getUrl() - { - return url; - } - - public String getHost() - { - return host; - } - - public int getPort() - { - return port; - } - - public String getAddress() - { - return address; - } - - public int getAckMode() - { - return ack_mode; - } - - public int getMsgCount() - { - return msg_count; - } - - public int getMsgSize() - { - return msg_size; - } - - public int getRandomMsgSizeStartFrom() - { - return random_msg_size_start_from; - } - - public boolean isDurable() - { - return durable; - } - - public boolean isTransacted() - { - return transacted; - } - - public int getTransactionSize() - { - return transaction_size; - } - - public int getWarmupCount() - { - return warmup_count; - } - - public boolean isCacheMessage() - { - return cacheMessage; - } - - public boolean isDisableMessageID() - { - return disableMessageID; - } - - public boolean isDisableTimestamp() - { - return disableTimestamp; - } - - public boolean isRandomMsgSize() - { - return random_msg_size; - } - - public String getMessageType() - { - return msgType; - } - - public boolean isPrintStdDev() - { - return printStdDev; - } - - public long getRate() - { - return rate; - } - - public boolean isExternalController() - { - return externalController; - } - - public void setAddress(String addr) - { - address = addr; - } - - public boolean isUseUniqueDests() - { - return useUniqueDest; - } -} diff --git a/java/tools/src/main/java/org/apache/qpid/tools/report/BasicReporter.java b/java/tools/src/main/java/org/apache/qpid/tools/report/BasicReporter.java new file mode 100644 index 0000000000..a9896c1d4e --- /dev/null +++ b/java/tools/src/main/java/org/apache/qpid/tools/report/BasicReporter.java @@ -0,0 +1,113 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.tools.report; + +import java.io.PrintStream; +import java.lang.reflect.Constructor; + +import javax.jms.Message; + +public class BasicReporter implements Reporter +{ + PrintStream out; + int batchSize = 0; + int batchCount = 0; + boolean headerPrinted = false; + protected Statistics overall; + Statistics batch; + Constructor<? extends Statistics> statCtor; + + public BasicReporter(Class<? extends Statistics> clazz, PrintStream out, + int batchSize, boolean printHeader) throws Exception + { + this.out = out; + this.batchSize = batchSize; + this.headerPrinted = !printHeader; + statCtor = clazz.getConstructor(); + overall = statCtor.newInstance(); + if (batchSize > 0) + { + batch = statCtor.newInstance(); + } + } + + /* (non-Javadoc) + * @see org.apache.qpid.tools.report.Reporter#message(javax.jms.Message) + */ + @Override + public void message(Message msg) + { + overall.message(msg); + if (batchSize > 0) + { + batch.message(msg); + if (++batchCount == batchSize) + { + if (!headerPrinted) + { + header(); + } + batch.report(out); + batch.clear(); + batchCount = 0; + } + } + } + + /* (non-Javadoc) + * @see org.apache.qpid.tools.report.Reporter#report() + */ + @Override + public void report() + { + if (!headerPrinted) + { + header(); + } + overall.report(out); + } + + /* (non-Javadoc) + * @see org.apache.qpid.tools.report.Reporter#header() + */ + @Override + public void header() + { + headerPrinted = true; + overall.header(out); + } + + /* (non-Javadoc) + * @see org.apache.qpid.tools.report.Reporter#log() + */ + @Override + public void log(String s) + { + // NOOP + } + + @Override + public void clear() + { + batch.clear(); + overall.clear(); + } +} diff --git a/java/tools/src/main/java/org/apache/qpid/tools/report/MercuryReporter.java b/java/tools/src/main/java/org/apache/qpid/tools/report/MercuryReporter.java new file mode 100644 index 0000000000..e9bf7100c1 --- /dev/null +++ b/java/tools/src/main/java/org/apache/qpid/tools/report/MercuryReporter.java @@ -0,0 +1,167 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.tools.report; + +import java.io.PrintStream; + +import org.apache.qpid.tools.report.Statistics.Throughput; +import org.apache.qpid.tools.report.Statistics.ThroughputAndLatency; + +public class MercuryReporter extends BasicReporter +{ + MercuryStatistics stats; + + public MercuryReporter(Class<? extends MercuryStatistics> clazz, PrintStream out, + int batchSize, boolean printHeader) throws Exception + { + super(clazz, out, batchSize, printHeader); + stats = (MercuryStatistics)overall; + } + + public double getRate() + { + return stats.getRate(); + } + + public double getAvgLatency() + { + return stats.getAvgLatency(); + } + + public double getStdDev() + { + return stats.getStdDev(); + } + + public double getMinLatency() + { + return stats.getMinLatency(); + } + + public double getMaxLatency() + { + return stats.getMaxLatency(); + } + + public int getSampleSize() + { + return stats.getSampleSize(); + } + + public interface MercuryStatistics extends Statistics + { + public double getRate(); + public long getMinLatency(); + public long getMaxLatency(); + public double getAvgLatency(); + public double getStdDev(); + public int getSampleSize(); + } + + public static class MercuryThroughput extends Throughput implements MercuryStatistics + { + double rate = 0; + + @Override + public void report(PrintStream out) + { + long elapsed = System.currentTimeMillis() - start; + rate = (double)messages/(double)elapsed; + } + + @Override + public void clear() + { + super.clear(); + rate = 0; + } + + public double getRate() + { + return rate; + } + + public int getSampleSize() + { + return messages; + } + + public long getMinLatency() { return 0; } + public long getMaxLatency() { return 0; } + public double getAvgLatency(){ return 0; } + public double getStdDev(){ return 0; } + + } + + public static class MercuryThroughputAndLatency extends ThroughputAndLatency implements MercuryStatistics + { + double rate = 0; + double avgLatency = 0; + double stdDev; + + @Override + public void report(PrintStream out) + { + long elapsed = System.currentTimeMillis() - start; + rate = (double)messages/(double)elapsed; + avgLatency = totalLatency/(double)sampleCount; + } + + @Override + public void clear() + { + super.clear(); + rate = 0; + avgLatency = 0; + } + + public double getRate() + { + return rate; + } + + public long getMinLatency() + { + return minLatency; + } + + public long getMaxLatency() + { + return maxLatency; + } + + public double getAvgLatency() + { + return avgLatency; + } + + public double getStdDev() + { + return stdDev; + } + + public int getSampleSize() + { + return messages; + } + } + +} diff --git a/java/tools/src/main/java/org/apache/qpid/tools/report/Reporter.java b/java/tools/src/main/java/org/apache/qpid/tools/report/Reporter.java new file mode 100644 index 0000000000..5e481458be --- /dev/null +++ b/java/tools/src/main/java/org/apache/qpid/tools/report/Reporter.java @@ -0,0 +1,40 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.tools.report; + +import javax.jms.Message; + +public interface Reporter +{ + + public void message(Message msg); + + public void report(); + + public void header(); + + // Will be used by some reporters to print statements which are greped by + // scripts. Example see java/tools/bin/perf-report + public void log(String s); + + public void clear(); + +}
\ No newline at end of file diff --git a/java/tools/src/main/java/org/apache/qpid/tools/report/Statistics.java b/java/tools/src/main/java/org/apache/qpid/tools/report/Statistics.java new file mode 100644 index 0000000000..73efd1f1e0 --- /dev/null +++ b/java/tools/src/main/java/org/apache/qpid/tools/report/Statistics.java @@ -0,0 +1,139 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.tools.report; + +import java.io.PrintStream; +import java.text.DecimalFormat; + +import javax.jms.Message; + +public interface Statistics +{ + public void message(Message msg); + public void report(PrintStream out); + public void header(PrintStream out); + public void clear(); + + static class Throughput implements Statistics + { + DecimalFormat df = new DecimalFormat("###.##"); + int messages = 0; + long start = 0; + boolean started = false; + + @Override + public void message(Message msg) + { + ++messages; + if (!started) + { + start = System.currentTimeMillis(); + started = true; + } + } + + @Override + public void report(PrintStream out) + { + long elapsed = System.currentTimeMillis() - start; + out.print(df.format((double)messages/(double)elapsed)); + } + + @Override + public void header(PrintStream out) + { + out.print("tp(m/s)"); + } + + public void clear() + { + messages = 0; + start = 0; + started = false; + } + } + + static class ThroughputAndLatency extends Throughput + { + long minLatency = Long.MAX_VALUE; + long maxLatency = Long.MIN_VALUE; + double totalLatency = 0; + int sampleCount = 0; + + @Override + public void message(Message msg) + { + super.message(msg); + try + { + long ts = msg.getLongProperty("ts"); + long latency = System.currentTimeMillis() - ts; + minLatency = Math.min(latency, minLatency); + maxLatency = Math.min(latency, maxLatency); + totalLatency = totalLatency + latency; + sampleCount++; + } + catch(Exception e) + { + System.out.println("Error calculating latency"); + } + } + + @Override + public void report(PrintStream out) + { + super.report(out); + double avgLatency = totalLatency/(double)sampleCount; + out.append('\t') + .append(String.valueOf(minLatency)) + .append('\t') + .append(String.valueOf(maxLatency)) + .append('\t') + .append(df.format(avgLatency)); + + out.flush(); + } + + @Override + public void header(PrintStream out) + { + super.header(out); + out.append('\t') + .append("l-min") + .append('\t') + .append("l-max") + .append('\t') + .append("l-avg"); + + out.flush(); + } + + public void clear() + { + super.clear(); + minLatency = 0; + maxLatency = 0; + totalLatency = 0; + sampleCount = 0; + } + } + +} |
