summaryrefslogtreecommitdiff
path: root/qpid/java/tools/src
diff options
context:
space:
mode:
authorAlex Rudyy <orudyy@apache.org>2015-04-15 09:47:28 +0000
committerAlex Rudyy <orudyy@apache.org>2015-04-15 09:47:28 +0000
commit0a0baee45ebcff44635907d457c4ff6810b09c87 (patch)
tree8bfb0f9eddbc23cff88af69be80ab3ce7d47011c /qpid/java/tools/src
parent54aa3d7070da16ce55c28ccad3f7d0871479e461 (diff)
downloadqpid-python-0a0baee45ebcff44635907d457c4ff6810b09c87.tar.gz
QPID-6481: Move java source tree to top level
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1673693 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/tools/src')
-rw-r--r--qpid/java/tools/src/main/java/org/apache/qpid/testkit/Client.java154
-rw-r--r--qpid/java/tools/src/main/java/org/apache/qpid/testkit/ErrorHandler.java27
-rw-r--r--qpid/java/tools/src/main/java/org/apache/qpid/testkit/Receiver.java216
-rw-r--r--qpid/java/tools/src/main/java/org/apache/qpid/testkit/Sender.java197
-rw-r--r--qpid/java/tools/src/main/java/org/apache/qpid/testkit/TestLauncher.java370
-rw-r--r--qpid/java/tools/src/main/java/org/apache/qpid/tools/Clock.java123
-rw-r--r--qpid/java/tools/src/main/java/org/apache/qpid/tools/JMXStressTestClient.java329
-rw-r--r--qpid/java/tools/src/main/java/org/apache/qpid/tools/JNDICheck.java201
-rw-r--r--qpid/java/tools/src/main/java/org/apache/qpid/tools/JVMArgConfiguration.java450
-rw-r--r--qpid/java/tools/src/main/java/org/apache/qpid/tools/MercuryBase.java190
-rw-r--r--qpid/java/tools/src/main/java/org/apache/qpid/tools/MercuryConsumerController.java231
-rw-r--r--qpid/java/tools/src/main/java/org/apache/qpid/tools/MercuryProducerController.java210
-rw-r--r--qpid/java/tools/src/main/java/org/apache/qpid/tools/MercuryTestController.java450
-rw-r--r--qpid/java/tools/src/main/java/org/apache/qpid/tools/MessageFactory.java64
-rw-r--r--qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidBench.java904
-rw-r--r--qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidReceive.java205
-rw-r--r--qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidSend.java303
-rw-r--r--qpid/java/tools/src/main/java/org/apache/qpid/tools/RestStressTestClient.java667
-rw-r--r--qpid/java/tools/src/main/java/org/apache/qpid/tools/StressTestClient.java446
-rw-r--r--qpid/java/tools/src/main/java/org/apache/qpid/tools/TestConfiguration.java134
-rw-r--r--qpid/java/tools/src/main/java/org/apache/qpid/tools/report/BasicReporter.java113
-rw-r--r--qpid/java/tools/src/main/java/org/apache/qpid/tools/report/MercuryReporter.java167
-rw-r--r--qpid/java/tools/src/main/java/org/apache/qpid/tools/report/Reporter.java40
-rw-r--r--qpid/java/tools/src/main/java/org/apache/qpid/tools/report/Statistics.java145
-rw-r--r--qpid/java/tools/src/main/java/org/apache/qpid/tools/util/ArgumentsParser.java172
-rw-r--r--qpid/java/tools/src/main/resources/stress-test-client.properties3
26 files changed, 0 insertions, 6511 deletions
diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/testkit/Client.java b/qpid/java/tools/src/main/java/org/apache/qpid/testkit/Client.java
deleted file mode 100644
index b10129d855..0000000000
--- a/qpid/java/tools/src/main/java/org/apache/qpid/testkit/Client.java
+++ /dev/null
@@ -1,154 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.testkit;
-
-
-import java.text.DateFormat;
-import java.text.DecimalFormat;
-import java.text.NumberFormat;
-import java.text.SimpleDateFormat;
-
-import javax.jms.Connection;
-import javax.jms.Destination;
-import javax.jms.ExceptionListener;
-import javax.jms.JMSException;
-import javax.jms.Session;
-
-public abstract class Client implements ExceptionListener
-{
- private Connection con;
- private Session ssn;
- private boolean durable = false;
- private boolean transacted = false;
- private int txSize = 10;
- private int ack_mode = Session.AUTO_ACKNOWLEDGE;
- private String contentType = "application/octet-stream";
-
- private long reportFrequency = 60000; // every min
-
- private DateFormat df = new SimpleDateFormat("yyyy.MM.dd 'at' HH:mm:ss");
- private NumberFormat nf = new DecimalFormat("##.00");
-
- private long startTime = System.currentTimeMillis();
- private ErrorHandler errorHandler = null;
-
- public Client(Connection con) throws Exception
- {
- this.con = con;
- this.con.setExceptionListener(this);
- durable = Boolean.getBoolean("durable");
- transacted = Boolean.getBoolean("transacted");
- txSize = Integer.getInteger("tx_size",10);
- contentType = System.getProperty("content_type","application/octet-stream");
- reportFrequency = Long.getLong("report_frequency", 60000);
- }
-
- public void close()
- {
- try
- {
- con.close();
- }
- catch (Exception e)
- {
- handleError("Error closing connection",e);
- }
- }
-
- public void onException(JMSException e)
- {
- handleError("Connection error",e);
- }
-
- public void setErrorHandler(ErrorHandler h)
- {
- this.errorHandler = h;
- }
-
- public void handleError(String msg,Exception e)
- {
- if (errorHandler != null)
- {
- errorHandler.handleError(msg, e);
- }
- else
- {
- System.err.println(msg);
- e.printStackTrace();
- }
- }
-
- protected Session getSsn()
- {
- return ssn;
- }
-
- protected void setSsn(Session ssn)
- {
- this.ssn = ssn;
- }
-
- protected boolean isDurable()
- {
- return durable;
- }
-
- protected boolean isTransacted()
- {
- return transacted;
- }
-
- protected int getTxSize()
- {
- return txSize;
- }
-
- protected int getAck_mode()
- {
- return ack_mode;
- }
-
- protected String getContentType()
- {
- return contentType;
- }
-
- protected long getReportFrequency()
- {
- return reportFrequency;
- }
-
- protected long getStartTime()
- {
- return startTime;
- }
-
- protected void setStartTime(long startTime)
- {
- this.startTime = startTime;
- }
-
- public DateFormat getDf()
- {
- return df;
- }
-
-}
diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/testkit/ErrorHandler.java b/qpid/java/tools/src/main/java/org/apache/qpid/testkit/ErrorHandler.java
deleted file mode 100644
index de7748acd6..0000000000
--- a/qpid/java/tools/src/main/java/org/apache/qpid/testkit/ErrorHandler.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.testkit;
-
-
-public interface ErrorHandler {
-
- public void handleError(String msg,Exception e);
-}
diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/testkit/Receiver.java b/qpid/java/tools/src/main/java/org/apache/qpid/testkit/Receiver.java
deleted file mode 100644
index 8dcf59e9c1..0000000000
--- a/qpid/java/tools/src/main/java/org/apache/qpid/testkit/Receiver.java
+++ /dev/null
@@ -1,216 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.testkit;
-
-
-import java.util.ArrayList;
-import java.util.List;
-
-import javax.jms.Connection;
-import javax.jms.Destination;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.TextMessage;
-
-import org.apache.qpid.client.AMQAnyDestination;
-import org.apache.qpid.client.AMQConnection;
-
-/**
- * A generic receiver which consumes messages
- * from a given address in a broker (host/port)
- * until told to stop by killing it.
- *
- * It participates in a feedback loop to ensure the producer
- * doesn't fill up the queue. If it receives an "End" msg
- * it sends a reply to the replyTo address in that msg.
- *
- * It doesn't check for correctness or measure anything
- * leaving those concerns to another entity.
- * However it prints a timestamp every x secs(-Dreport_frequency)
- * as checkpoint to figure out how far the test has progressed if
- * a failure occurred.
- *
- * It also takes in an optional Error handler to
- * pass out any error in addition to writing them to std err.
- *
- * This is intended more as building block to create
- * more complex test cases. However there is a main method
- * provided to use this standalone.
- *
- * The following options are available and configurable
- * via jvm args.
- *
- * sync_rcv - Whether to consume sync (instead of using a listener).
- * report_frequency - how often a timestamp is printed
- * durable
- * transacted
- * tx_size - size of transaction batch in # msgs. *
- * check_for_dups - check for duplicate messages and out of order messages.
- * jms_durable_sub - create a durable subscription instead of a regular subscription.
- */
-public class Receiver extends Client implements MessageListener
-{
- private long msg_count = 0;
- private int sequence = 0;
- private boolean syncRcv = Boolean.getBoolean("sync_rcv");
- private boolean jmsDurableSub = Boolean.getBoolean("jms_durable_sub");
- private boolean checkForDups = Boolean.getBoolean("check_for_dups");
- private MessageConsumer consumer;
- private List<Integer> duplicateMessages = new ArrayList<Integer>();
-
- public Receiver(Connection con,String addr) throws Exception
- {
- super(con);
- setSsn(con.createSession(isTransacted(), getAck_mode()));
- consumer = getSsn().createConsumer(new AMQAnyDestination(addr));
- if (!syncRcv)
- {
- consumer.setMessageListener(this);
- }
-
- System.out.println("Receiving messages from : " + addr);
- }
-
- public void onMessage(Message msg)
- {
- handleMessage(msg);
- }
-
- public void run() throws Exception
- {
- long sleepTime = getReportFrequency();
- while(true)
- {
- if(syncRcv)
- {
- long t = sleepTime;
- while (t > 0)
- {
- long start = System.currentTimeMillis();
- Message msg = consumer.receive(t);
- t = t - (System.currentTimeMillis() - start);
- handleMessage(msg);
- }
- }
- Thread.sleep(sleepTime);
- System.out.println(getDf().format(System.currentTimeMillis())
- + " - messages received : " + msg_count);
- }
- }
-
- private void handleMessage(Message m)
- {
- if (m == null) { return; }
-
- try
- {
- if (m instanceof TextMessage && ((TextMessage) m).getText().equals("End"))
- {
- MessageProducer temp = getSsn().createProducer(m.getJMSReplyTo());
- Message controlMsg = getSsn().createTextMessage();
- temp.send(controlMsg);
- if (isTransacted())
- {
- getSsn().commit();
- }
- temp.close();
- }
- else
- {
-
- int seq = m.getIntProperty("sequence");
- if (checkForDups)
- {
- if (seq == 0)
- {
- sequence = 0; // wrap around for each iteration
- System.out.println("Received " + duplicateMessages.size() + " duplicate messages during the iteration");
- duplicateMessages.clear();
- }
-
- if (seq < sequence)
- {
- duplicateMessages.add(seq);
- }
- else if (seq == sequence)
- {
- sequence++;
- msg_count ++;
- }
- else
- {
- // Multiple publishers are not allowed in this test case.
- // So out of order messages are not allowed.
- throw new Exception(": Received an out of order message (expected="
- + sequence + ",received=" + seq + ")" );
- }
- }
- else
- {
- msg_count ++;
- }
-
- // Please note that this test case doesn't expect duplicates
- // When testing for transactions.
- if (isTransacted() && msg_count % getTxSize() == 0)
- {
- getSsn().commit();
- }
- }
- }
- catch (Exception e)
- {
- e.printStackTrace();
- handleError("Exception receiving messages",e);
- }
- }
-
- // Receiver host port address
- public static void main(String[] args) throws Exception
- {
- String host = "127.0.0.1";
- int port = 5672;
- String addr = "message_queue";
-
- if (args.length > 0)
- {
- host = args[0];
- }
- if (args.length > 1)
- {
- port = Integer.parseInt(args[1]);
- }
- if (args.length > 2)
- {
- addr = args[2];
- }
-
- AMQConnection con = new AMQConnection(
- "amqp://username:password@topicClientid/test?brokerlist='tcp://"
- + host + ":" + port + "'");
-
- Receiver rcv = new Receiver(con,addr);
- rcv.run();
- }
-
-}
diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/testkit/Sender.java b/qpid/java/tools/src/main/java/org/apache/qpid/testkit/Sender.java
deleted file mode 100644
index 14b9b7302f..0000000000
--- a/qpid/java/tools/src/main/java/org/apache/qpid/testkit/Sender.java
+++ /dev/null
@@ -1,197 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.testkit;
-
-
-import java.text.DateFormat;
-import java.text.DecimalFormat;
-import java.text.NumberFormat;
-import java.text.SimpleDateFormat;
-import java.util.Random;
-
-import javax.jms.Connection;
-import javax.jms.DeliveryMode;
-import javax.jms.Destination;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-
-import org.apache.qpid.client.AMQAnyDestination;
-import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.tools.MessageFactory;
-
-/**
- * A generic sender which sends a stream of messages
- * to a given address in a broker (host/port)
- * until told to stop by killing it.
- *
- * It has a feedback loop to ensure it doesn't fill
- * up queues due to a slow consumer.
- *
- * It doesn't check for correctness or measure anything
- * leaving those concerns to another entity.
- * However it prints a timestamp every x secs(-Dreport_frequency)
- * as checkpoint to figure out how far the test has progressed if
- * a failure occurred.
- *
- * It also takes in an optional Error handler to
- * pass out any error in addition to writing them to std err.
- *
- * This is intended more as building block to create
- * more complex test cases. However there is a main method
- * provided to use this standalone.
- *
- * The following options are available and configurable
- * via jvm args.
- *
- * msg_size (256)
- * msg_count (10) - # messages before waiting for feedback
- * sleep_time (1000 ms) - sleep time btw each iteration
- * report_frequency - how often a timestamp is printed
- * durable
- * transacted
- * tx_size - size of transaction batch in # msgs.
- */
-public class Sender extends Client
-{
- protected int msg_size = 256;
- protected int msg_count = 10;
- protected int iterations = -1;
- protected long sleep_time = 1000;
-
- protected Destination dest = null;
- protected Destination replyTo = null;
- protected DateFormat df = new SimpleDateFormat("yyyy.MM.dd 'at' HH:mm:ss");
- protected NumberFormat nf = new DecimalFormat("##.00");
-
- protected MessageProducer producer;
- Random gen = new Random(19770905);
-
- public Sender(Connection con,String addr) throws Exception
- {
- super(con);
- this.msg_size = Integer.getInteger("msg_size", 100);
- this.msg_count = Integer.getInteger("msg_count", 10);
- this.iterations = Integer.getInteger("iterations", -1);
- this.sleep_time = Long.getLong("sleep_time", 1000);
- this.setSsn(con.createSession(isTransacted(),Session.AUTO_ACKNOWLEDGE));
- this.dest = new AMQAnyDestination(addr);
- this.producer = getSsn().createProducer(dest);
- this.replyTo = getSsn().createTemporaryQueue();
-
- System.out.println("Sending messages to : " + addr);
- }
-
- /*
- * If msg_size not specified it generates a message
- * between 500-1500 bytes.
- */
- protected Message getNextMessage() throws Exception
- {
- int s = msg_size == -1 ? 500 + gen.nextInt(1000) : msg_size;
- Message msg = (getContentType().equals("text/plain")) ?
- MessageFactory.createTextMessage(getSsn(), s):
- MessageFactory.createBytesMessage(getSsn(), s);
-
- msg.setJMSDeliveryMode((isDurable()) ? DeliveryMode.PERSISTENT
- : DeliveryMode.NON_PERSISTENT);
- return msg;
- }
-
- public void run()
- {
- try
- {
- boolean infinite = (iterations == -1);
- for (int x=0; infinite || x < iterations; x++)
- {
- long now = System.currentTimeMillis();
- if (now - getStartTime() >= getReportFrequency())
- {
- System.out.println(df.format(now) + " - iterations : " + x);
- setStartTime(now);
- }
-
- for (int i = 0; i < msg_count; i++)
- {
- Message msg = getNextMessage();
- msg.setIntProperty("sequence",i);
- producer.send(msg);
- if (isTransacted() && msg_count % getTxSize() == 0)
- {
- getSsn().commit();
- }
- }
- TextMessage m = getSsn().createTextMessage("End");
- m.setJMSReplyTo(replyTo);
- producer.send(m);
-
- if (isTransacted())
- {
- getSsn().commit();
- }
-
- MessageConsumer feedbackConsumer = getSsn().createConsumer(replyTo);
- feedbackConsumer.receive();
- feedbackConsumer.close();
- if (isTransacted())
- {
- getSsn().commit();
- }
- Thread.sleep(sleep_time);
- }
- }
- catch (Exception e)
- {
- handleError("Exception sending messages",e);
- }
- }
-
- // Receiver host port address
- public static void main(String[] args) throws Exception
- {
- String host = "127.0.0.1";
- int port = 5672;
- String addr = "message_queue";
-
- if (args.length > 0)
- {
- host = args[0];
- }
- if (args.length > 1)
- {
- port = Integer.parseInt(args[1]);
- }
- if (args.length > 2)
- {
- addr = args[2];
- }
-
- AMQConnection con = new AMQConnection(
- "amqp://username:password@topicClientid/test?brokerlist='tcp://"
- + host + ":" + port + "'");
-
- Sender sender = new Sender(con,addr);
- sender.run();
- }
-}
diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/testkit/TestLauncher.java b/qpid/java/tools/src/main/java/org/apache/qpid/testkit/TestLauncher.java
deleted file mode 100644
index 0c94030ec6..0000000000
--- a/qpid/java/tools/src/main/java/org/apache/qpid/testkit/TestLauncher.java
+++ /dev/null
@@ -1,370 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.testkit;
-
-
-import java.io.ByteArrayOutputStream;
-import java.io.PrintStream;
-import java.text.DateFormat;
-import java.text.DecimalFormat;
-import java.text.NumberFormat;
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-
-import org.apache.log4j.BasicConfigurator;
-import org.apache.log4j.ConsoleAppender;
-import org.apache.log4j.Level;
-import org.apache.log4j.Logger;
-import org.apache.log4j.PatternLayout;
-import org.apache.qpid.client.AMQAnyDestination;
-import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.client.AMQQueue;
-import org.apache.qpid.client.AMQTopic;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.thread.Threading;
-
-/**
- * A basic test case class that could launch a Sender/Receiver
- * or both, each on it's own separate thread.
- *
- * If con_count == ssn_count, then each entity created will have
- * it's own Connection. Else if con_count {@literal <} ssn_count, then
- * a connection will be shared by ssn_count/con_count # of entities.
- *
- * The if both sender and receiver options are set, it will
- * share a connection.
- *
- * The following options are available as jvm args
- * host, port
- * con_count,ssn_count
- * con_idle_time - which determines heartbeat
- * sender, receiver - booleans which indicate which entity to create.
- * Setting them both is also a valid option.
- */
-public class TestLauncher implements ErrorHandler
-{
- protected String host = "127.0.0.1";
- protected int port = 5672;
- protected int sessions_per_con = 1;
- protected int connection_count = 1;
- protected long heartbeat = 5000;
- protected boolean sender = false;
- protected boolean receiver = false;
- protected boolean useUniqueDests = false;
- protected String url;
-
- protected String address = "my_queue; {create: always}";
- protected boolean durable = false;
- protected String failover = "";
- protected AMQConnection controlCon;
- protected Destination controlDest = null;
- protected Session controlSession = null;
- protected MessageProducer statusSender;
- protected List<AMQConnection> clients = new ArrayList<AMQConnection>();
- protected DateFormat df = new SimpleDateFormat("yyyy.MM.dd 'at' HH:mm:ss");
- protected NumberFormat nf = new DecimalFormat("##.00");
- protected String testName;
-
- public TestLauncher()
- {
- testName = System.getProperty("test_name","UNKNOWN");
- host = System.getProperty("host", "127.0.0.1");
- port = Integer.getInteger("port", 5672);
- sessions_per_con = Integer.getInteger("ssn_per_con", 1);
- connection_count = Integer.getInteger("con_count", 1);
- heartbeat = Long.getLong("heartbeat", 5);
- sender = Boolean.getBoolean("sender");
- receiver = Boolean.getBoolean("receiver");
- useUniqueDests = Boolean.getBoolean("use_unique_dests");
-
- failover = System.getProperty("failover", "");
- durable = Boolean.getBoolean("durable");
-
- url = "amqp://username:password@topicClientid/test?brokerlist='tcp://"
- + host + ":" + port + "?heartbeat='" + heartbeat+ "''";
-
- if (failover.equalsIgnoreCase("failover_exchange"))
- {
- url += "&failover='failover_exchange'";
-
- System.out.println("Failover exchange " + url );
- }
-
- configureLogging();
- }
-
- protected void configureLogging()
- {
- PatternLayout layout = new PatternLayout();
- layout.setConversionPattern("%t %d %p [%c{4}] %m%n");
- BasicConfigurator.configure(new ConsoleAppender(layout));
-
- String logLevel = System.getProperty("log.level","warn");
- String logComponent = System.getProperty("log.comp","org.apache.qpid");
-
- Logger logger = Logger.getLogger(logComponent);
- logger.setLevel(Level.toLevel(logLevel, Level.WARN));
-
- System.out.println("Level " + logger.getLevel());
-
- }
-
- public void setUpControlChannel()
- {
- try
- {
- controlCon = new AMQConnection(url);
- controlCon.start();
-
- controlDest = new AMQAnyDestination("control; {create: always}"); // durable
-
- // Create the session to setup the messages
- controlSession = controlCon.createSession(false, Session.AUTO_ACKNOWLEDGE);
- statusSender = controlSession.createProducer(controlDest);
-
- }
- catch (Exception e)
- {
- handleError("Error while setting up the test",e);
- }
- }
-
- public void cleanup()
- {
- try
- {
- controlSession.close();
- controlCon.close();
- for (AMQConnection con : clients)
- {
- con.close();
- }
- }
- catch (Exception e)
- {
- handleError("Error while tearing down the test",e);
- }
- }
-
- public void start(String addr)
- {
- try
- {
- if (addr == null)
- {
- addr = address;
- }
-
- int ssn_per_con = sessions_per_con;
- String addrTemp = addr;
- for (int i = 0; i< connection_count; i++)
- {
- AMQConnection con = new AMQConnection(url);
- con.start();
- clients.add(con);
- for (int j = 0; j< ssn_per_con; j++)
- {
- String index = createPrefix(i,j);
- if (useUniqueDests)
- {
- addrTemp = modifySubject(index,addr);
- }
-
- if (sender)
- {
- createSender(index,con,addrTemp,this);
- }
-
- if (receiver)
- {
- System.out.println("########## Creating receiver ##################");
-
- createReceiver(index,con,addrTemp,this);
- }
- }
- }
- }
- catch (Exception e)
- {
- handleError("Exception while setting up the test",e);
- }
-
- }
-
- protected void createReceiver(String index,final AMQConnection con, final String addr, final ErrorHandler h)
- {
- Runnable r = new Runnable()
- {
- public void run()
- {
- try
- {
- Receiver rcv = new Receiver(con,addr);
- rcv.setErrorHandler(h);
- rcv.run();
- }
- catch (Exception e)
- {
- h.handleError("Error Starting Receiver", e);
- }
- }
- };
-
- Thread t = null;
- try
- {
- t = Threading.getThreadFactory().createThread(r);
- }
- catch(Exception e)
- {
- handleError("Error creating Receive thread",e);
- }
-
- t.setName("ReceiverThread-" + index);
- t.start();
- }
-
- protected void createSender(String index,final AMQConnection con, final String addr, final ErrorHandler h)
- {
- Runnable r = new Runnable()
- {
- public void run()
- {
- try
- {
- Sender sender = new Sender(con, addr);
- sender.setErrorHandler(h);
- sender.run();
- }
- catch (Exception e)
- {
- h.handleError("Error Starting Sender", e);
- }
- }
- };
-
- Thread t = null;
- try
- {
- t = Threading.getThreadFactory().createThread(r);
- }
- catch(Exception e)
- {
- handleError("Error creating Sender thread",e);
- }
-
- t.setName("SenderThread-" + index);
- t.start();
- }
-
- public synchronized void handleError(String msg,Exception e)
- {
- // In case sending the message fails
- StringBuilder sb = new StringBuilder();
- sb.append(msg);
- sb.append(" @ ");
- sb.append(df.format(new Date(System.currentTimeMillis())));
- sb.append(" ");
- sb.append(e.getMessage());
- System.err.println(sb.toString());
- e.printStackTrace();
-
- try
- {
- TextMessage errorMsg = controlSession.createTextMessage();
- errorMsg.setStringProperty("status", "error");
- errorMsg.setStringProperty("desc", msg);
- errorMsg.setStringProperty("time", df.format(new Date(System.currentTimeMillis())));
- errorMsg.setStringProperty("exception-trace", serializeStackTrace(e));
-
- System.out.println("Msg " + errorMsg);
-
- statusSender.send(errorMsg);
- }
- catch (JMSException e1)
- {
- e1.printStackTrace();
- }
- }
-
- private String serializeStackTrace(Exception e)
- {
- ByteArrayOutputStream bOut = new ByteArrayOutputStream();
- PrintStream printStream = new PrintStream(bOut);
- e.printStackTrace(printStream);
- printStream.close();
- return bOut.toString();
- }
-
- private String createPrefix(int i, int j)
- {
- return String.valueOf(i).concat(String.valueOf(j));
- }
-
- /**
- * A basic helper function to modify the subjects by
- * appending an index.
- */
- private String modifySubject(String index,String addr)
- {
- if (addr.indexOf("/") > 0)
- {
- addr = addr.substring(0,addr.indexOf("/")+1) +
- index +
- addr.substring(addr.indexOf("/")+1,addr.length());
- }
- else if (addr.indexOf(";") > 0)
- {
- addr = addr.substring(0,addr.indexOf(";")) +
- "/" + index +
- addr.substring(addr.indexOf(";"),addr.length());
- }
- else
- {
- addr = addr + "/" + index;
- }
-
- return addr;
- }
-
- public static void main(String[] args)
- {
- final TestLauncher test = new TestLauncher();
- test.setUpControlChannel();
- System.out.println("args.length " + args.length);
- System.out.println("args [0] " + args [0]);
- test.start(args.length > 0 ? args [0] : null);
- Runtime.getRuntime().addShutdownHook(new Thread() {
- public void run() { test.cleanup(); }
- });
-
- }
-}
diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/Clock.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/Clock.java
deleted file mode 100644
index 7eb83a520b..0000000000
--- a/qpid/java/tools/src/main/java/org/apache/qpid/tools/Clock.java
+++ /dev/null
@@ -1,123 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.tools;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * In the future this will be replaced by a Clock abstraction
- * that can utilize a realtime clock when running in RT Java.
- */
-
-public class Clock
-{
- private static final Logger _logger = LoggerFactory.getLogger(Clock.class);
-
- public final static long SEC = 60000;
-
- private static Precision precision;
- private static long offset = -1; // in nano secs
-
- public enum Precision
- {
- NANO_SECS, MILI_SECS;
-
- static Precision getPrecision(String str)
- {
- if ("mili".equalsIgnoreCase(str))
- {
- return MILI_SECS;
- }
- else
- {
- return NANO_SECS;
- }
- }
- };
-
- static
- {
- precision = Precision.getPrecision(System.getProperty("precision","mili"));
- //offset = Long.getLong("offset",-1);
-
- if (_logger.isDebugEnabled())
- {
- System.out.println("Using precision : " + precision );
- //+ " and offset " + offset);
- }
- }
-
- public static Precision getPrecision()
- {
- return precision;
- }
-
- public static long getTime()
- {
- if (precision == Precision.NANO_SECS)
- {
- if (offset == -1)
- {
- return System.nanoTime();
- }
- else
- {
- return System.nanoTime() + offset;
- }
- }
- else
- {
- if (offset == -1)
- {
- return System.currentTimeMillis();
- }
- else
- {
- return System.currentTimeMillis() + offset/convertToMiliSecs();
- }
- }
- }
-
- public static long convertToSecs()
- {
- if (precision == Precision.NANO_SECS)
- {
- return 1000000000;
- }
- else
- {
- return 1000;
- }
- }
-
- public static long convertToMiliSecs()
- {
- if (precision == Precision.NANO_SECS)
- {
- return 1000000;
- }
- else
- {
- return 1;
- }
- }
-}
diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/JMXStressTestClient.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/JMXStressTestClient.java
deleted file mode 100644
index 1b3c961660..0000000000
--- a/qpid/java/tools/src/main/java/org/apache/qpid/tools/JMXStressTestClient.java
+++ /dev/null
@@ -1,329 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.tools;
-
-
-import javax.management.MBeanServerConnection;
-import javax.management.MalformedObjectNameException;
-import javax.management.ObjectName;
-import javax.management.remote.JMXConnector;
-import javax.management.remote.JMXConnectorFactory;
-import javax.management.remote.JMXServiceURL;
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.qpid.tools.util.ArgumentsParser;
-
-public class JMXStressTestClient
-{
-
- public static void main(String[] args) throws Exception
- {
- ArgumentsParser parser = new ArgumentsParser();
- Arguments arguments;
- try
- {
- arguments = parser.parse(args, Arguments.class);
- arguments.validate();
- }
- catch(IllegalArgumentException e)
- {
- System.out.println("Invalid argument:" + e.getMessage());
- parser.usage(Arguments.class, Arguments.REQUIRED);
- System.out.println("\nRun example:");
- System.out.println(" java -cp qpid-tools.jar org.apache.qpid.tools.JMXStressTestClient \\");
- System.out.println(" repetitions=10 host=localhost port=8999 username=admin password=admin \\");
- System.out.println(" virtualHost=default createQueue=true bindQueue=true deleteQueue=true \\");
- System.out.println(" uniqueQueues=true queueName=boo exchangeName=amq.fanout");
- return;
- }
-
- JMXStressTestClient client = new JMXStressTestClient();
- client.run(arguments);
- }
-
- public void run(Arguments arguments) throws IOException,MalformedObjectNameException
- {
- log(arguments.toString());
- for (int i = 0; i < arguments.getRepetitions(); i++)
- {
- try(JMXConnector connector = createConnector(arguments.getHost(), arguments.getPort(), arguments.getUsername(), arguments.getPassword()))
- {
- runIteration(arguments, connector, i);
- }
- }
- }
-
- private void runIteration(Arguments arguments, JMXConnector connector, int iteration) throws IOException, MalformedObjectNameException
- {
- log("Iteration " + iteration);
- MBeanServerConnection connection = connector.getMBeanServerConnection();
- String virtualHost = arguments.getVirtualHost();
- if (virtualHost != null)
- {
- ObjectName virtualHostMBeanName = new ObjectName("org.apache.qpid:type=VirtualHost.VirtualHostManager,VirtualHost="
- + ObjectName.quote(virtualHost));
-
- Set<ObjectName> virtualHostMBeans = connection.queryNames(virtualHostMBeanName, null);
- if(virtualHostMBeans.size() == 0)
- {
- throw new IllegalArgumentException("VirtualHost MBean was not found for virtual host " + virtualHost);
- }
-
- createAndBindQueueIfRequired(arguments, iteration, connection, virtualHostMBeanName);
- }
- }
-
- private void log(String logMessage)
- {
- System.out.println(logMessage);
- }
-
- private void createAndBindQueueIfRequired(Arguments arguments, int iteration, MBeanServerConnection connection,
- ObjectName virtualHostMBeanName) throws MalformedObjectNameException, IOException
- {
- if (arguments.isCreateQueue())
- {
- String queueName = arguments.getQueueName();
-
- if (queueName == null)
- {
- queueName = "temp-queue-" + System.nanoTime();
- }
- else if (arguments.isUniqueQueues())
- {
- queueName = queueName + "-" + iteration;
- }
-
- createQueue(connection, virtualHostMBeanName, queueName);
-
- if (arguments.isBindQueue())
- {
- bindQueue(connection, arguments.getVirtualHost(), queueName, arguments.getExchangeName());
- }
-
- if (arguments.isDeleteQueue())
- {
- deleteQueue(connection, virtualHostMBeanName, queueName);
- }
- }
- }
-
- private void deleteQueue(MBeanServerConnection connection, ObjectName virtualHostMBeanName, String queueName)
- {
- log(" Delete queue " + queueName);
- try
- {
- connection.invoke(virtualHostMBeanName, "deleteQueue", new Object[]{queueName}, new String[]{String.class.getName()});
- }
- catch (Exception e)
- {
- throw new RuntimeException("Cannot delete queue " + queueName, e);
- }
- }
-
- private void createQueue(MBeanServerConnection connection, ObjectName virtualHostMBeanName, String queueName)
- {
- log(" Create queue " + queueName);
- try
- {
- connection.invoke(virtualHostMBeanName, "createNewQueue", new Object[]{queueName, null, true},
- new String[]{String.class.getName(), String.class.getName(), boolean.class.getName()});
- }
- catch (Exception e)
- {
- throw new RuntimeException("Cannot create queue " + queueName, e);
- }
- }
-
- private void bindQueue(MBeanServerConnection connection, String virtualHost, String queueName, String exchangeName)
- throws MalformedObjectNameException, IOException
- {
- if (exchangeName == null)
- {
- exchangeName = "amq.direct";
- }
-
- log(" Bind queue " + queueName + " to " + exchangeName + " using binding key " + queueName);
-
- ObjectName exchangeObjectName = new ObjectName("org.apache.qpid:type=VirtualHost.Exchange,VirtualHost="
- + ObjectName.quote(virtualHost) + ","
- + "name=" + ObjectName.quote(exchangeName) + ",ExchangeType=*");
-
- Set<ObjectName> exchanges = connection.queryNames(exchangeObjectName, null);
-
- if(exchanges.size() == 0)
- {
- throw new IllegalArgumentException("Cannot find exchange MBean for exchange " + exchangeName);
- }
-
- try
- {
- connection.invoke(exchanges.iterator().next(), "createNewBinding", new Object[]{queueName, queueName},
- new String[]{String.class.getName(), String.class.getName()});
- }
- catch (Exception e)
- {
- throw new RuntimeException("Cannot delete queue " + queueName, e);
- }
- }
-
- JMXConnector createConnector(String host, int port, String username, String password) throws IOException
- {
- Map<String, Object> env = new HashMap<>();
- JMXServiceURL jmxUrl = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://" + host + ":" + port + "/jmxrmi");
- env.put(JMXConnector.CREDENTIALS, new String[] {username,password});
-
- return JMXConnectorFactory.connect(jmxUrl, env);
- }
-
- public static class Arguments
- {
- private static final Set<String> REQUIRED = new HashSet<>(Arrays.asList("host", "port", "username", "password"));
-
- private String host = null;
- private int port = -1;
- private String username = null;
- private String password = null;
-
- private String virtualHost = null;
- private String queueName = null;
- private String exchangeName = null;
-
- private int repetitions = 1;
-
- private boolean createQueue = false;
- private boolean deleteQueue = false;
- private boolean uniqueQueues = false;
- private boolean bindQueue = false;
-
- public Arguments()
- {
- }
-
- public void validate()
- {
- if (host == null || host.equals(""))
- {
- throw new IllegalArgumentException("Mandatory argument 'host' is not specified");
- }
-
- if (port == -1)
- {
- throw new IllegalArgumentException("Mandatory argument 'port' is not specified");
- }
-
- if (username == null || username.equals(""))
- {
- throw new IllegalArgumentException("Mandatory argument 'username' is not specified");
- }
-
- if (password == null || password.equals(""))
- {
- throw new IllegalArgumentException("Mandatory argument 'password' is not specified");
- }
- }
-
- public int getRepetitions()
- {
- return repetitions;
- }
-
- public String getHost()
- {
- return host;
- }
-
- public int getPort()
- {
- return port;
- }
-
- public String getUsername()
- {
- return username;
- }
-
- public String getPassword()
- {
- return password;
- }
-
- public String getVirtualHost()
- {
- return virtualHost;
- }
-
- public boolean isCreateQueue()
- {
- return createQueue;
- }
-
- public boolean isDeleteQueue()
- {
- return deleteQueue;
- }
-
- public boolean isUniqueQueues()
- {
- return uniqueQueues;
- }
-
- public String getQueueName()
- {
- return queueName;
- }
-
- public boolean isBindQueue()
- {
- return bindQueue;
- }
-
- public String getExchangeName()
- {
- return exchangeName;
- }
-
- @Override
- public String toString()
- {
- return "Arguments{" +
- "host='" + host + '\'' +
- ", port=" + port +
- ", username='" + username + '\'' +
- ", password='" + password + '\'' +
- ", virtualHost='" + virtualHost + '\'' +
- ", queueName='" + queueName + '\'' +
- ", exchangeName='" + exchangeName + '\'' +
- ", repetitions=" + repetitions +
- ", createQueue=" + createQueue +
- ", deleteQueue=" + deleteQueue +
- ", uniqueQueues=" + uniqueQueues +
- ", bindQueue=" + bindQueue +
- '}';
- }
- }
-
-}
diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/JNDICheck.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/JNDICheck.java
deleted file mode 100644
index bd6cfd4363..0000000000
--- a/qpid/java/tools/src/main/java/org/apache/qpid/tools/JNDICheck.java
+++ /dev/null
@@ -1,201 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.qpid.tools;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.util.Enumeration;
-import java.util.Hashtable;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Properties;
-
-import javax.naming.Context;
-import javax.naming.InitialContext;
-import javax.naming.NamingException;
-
-import org.apache.qpid.client.AMQConnectionFactory;
-import org.apache.qpid.client.AMQDestination;
-import org.apache.qpid.jms.FailoverPolicy;
-
-public class JNDICheck
-{
- private static final String QUEUE = "queue.";
- private static final String TOPIC = "topic.";
- private static final String DESTINATION = "destination.";
- private static final String CONNECTION_FACTORY = "connectionfactory.";
-
- public static void main(String[] args)
- {
-
- if (args.length != 1)
- {
- usage();
- }
-
- String propertyFile = args[0];
-
- new JNDICheck(propertyFile);
- }
-
- private static void usage()
- {
- exit("Usage: JNDICheck <JNDI Config file>", 0);
- }
-
- private static void exit(String message, int exitCode)
- {
- System.err.println(message);
- System.exit(exitCode);
- }
-
- private static String JAVA_NAMING = "java.naming.factory.initial";
-
- private Context _context = null;
- private Hashtable _environment = null;
-
- public JNDICheck(String propertyFile)
- {
-
- // Load JNDI properties
- Properties properties = new Properties();
-
- try(FileInputStream propertiesStream = new FileInputStream(new File(propertyFile)))
- {
- properties.load(propertiesStream);
- }
- catch (IOException e)
- {
- exit("Unable to open property file:" + propertyFile + ". Due to:" + e.getMessage(), 1);
- }
-
- //Create the initial context
- try
- {
-
- System.setProperty(JAVA_NAMING, properties.getProperty(JAVA_NAMING));
-
- _context = new InitialContext(properties);
-
- _environment = _context.getEnvironment();
-
- Enumeration keys = _environment.keys();
-
- List<String> queues = new LinkedList<String>();
- List<String> topics = new LinkedList<String>();
- List<String> destinations = new LinkedList<String>();
- List<String> connectionFactories = new LinkedList<String>();
-
- while (keys.hasMoreElements())
- {
- String key = keys.nextElement().toString();
-
- if (key.startsWith(QUEUE))
- {
- queues.add(key);
- }
- else if (key.startsWith(TOPIC))
- {
- topics.add(key);
- }
- else if (key.startsWith(DESTINATION))
- {
- destinations.add(key);
- }
- else if (key.startsWith(CONNECTION_FACTORY))
- {
- connectionFactories.add(key);
- }
- }
-
- printHeader(propertyFile);
- printEntries(QUEUE, queues);
- printEntries(TOPIC, topics);
- printEntries(DESTINATION, destinations);
- printEntries(CONNECTION_FACTORY, connectionFactories);
-
- }
- catch (NamingException e)
- {
- exit("Unable to load JNDI Context due to:" + e.getMessage(), 1);
- }
-
- }
-
- private void printHeader(String file)
- {
- print("JNDI file :" + file);
- }
-
- private void printEntries(String type, List<String> list)
- {
- if (list.size() > 0)
- {
- String name = type.substring(0, 1).toUpperCase() + type.substring(1, type.length() - 1);
- print(name + " elements in file:");
- printList(list);
- print("");
- }
- }
-
- private void printList(List<String> list)
- {
- for (String item : list)
- {
- String key = item.substring(item.indexOf('.') + 1);
-
- try
- {
- print(key, _context.lookup(key));
- }
- catch (NamingException e)
- {
- exit("Error: item " + key + " no longer in context.", 1);
- }
- }
- }
-
- private void print(String key, Object object)
- {
- if (object instanceof AMQDestination)
- {
- print(key + ":" + object);
- }
- else if (object instanceof AMQConnectionFactory)
- {
- AMQConnectionFactory factory = (AMQConnectionFactory) object;
- print(key + ":Connection");
- print("ConnectionURL:");
- print(factory.getConnectionURL().toString());
- print("FailoverPolicy");
- print(new FailoverPolicy(factory.getConnectionURL(),null).toString());
- print("");
- }
- }
-
- private void print(String msg)
- {
- System.out.println(msg);
- }
-
-}
diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/JVMArgConfiguration.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/JVMArgConfiguration.java
deleted file mode 100644
index e0e48519f3..0000000000
--- a/qpid/java/tools/src/main/java/org/apache/qpid/tools/JVMArgConfiguration.java
+++ /dev/null
@@ -1,450 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.tools;
-
-import java.text.DecimalFormat;
-
-import javax.jms.Connection;
-import javax.jms.Session;
-
-import org.apache.qpid.client.AMQConnection;
-
-public class JVMArgConfiguration implements TestConfiguration
-{
- /*
- * By default the connection URL is used.
- * This allows a user to easily specify a fully fledged URL any given property.
- * Ex. SSL parameters
- *
- * By providing a host & port allows a user to simply override the URL.
- * This allows to create multiple clients in test scripts easily,
- * without having to deal with the long URL format.
- */
- private String url = "amqp://guest:guest@clientid/testpath?brokerlist='tcp://localhost:5672'";
-
- private String host = "";
-
- private int port = -1;
-
- private String address = "queue; {create : always}";
-
- private long timeout = 0;
-
- private int msg_size = 1024;
-
- private int random_msg_size_start_from = 1;
-
- private boolean cacheMessage = false;
-
- private boolean disableMessageID = false;
-
- private boolean disableTimestamp = false;
-
- private boolean durable = false;
-
- private int transaction_size = 0;
-
- private int ack_mode = Session.AUTO_ACKNOWLEDGE;
-
- private int msg_count = 10;
-
- private int warmup_count = 1;
-
- private boolean random_msg_size = false;
-
- private String msgType = "bytes";
-
- private boolean printStdDev = false;
-
- private int sendRate = 0;
-
- private boolean externalController = false;
-
- private boolean useUniqueDest = false; // useful when using multiple connections.
-
- private int ackFrequency = 100;
-
- private DecimalFormat df = new DecimalFormat("###.##");
-
- private int reportEvery = 0;
-
- private boolean isReportTotal = false;
-
- private boolean isReportHeader = true;
-
- private int sendEOS = 0;
-
- private int connectionCount = 1;
-
- private int rollbackFrequency = 0;
-
- private boolean printHeaders;
-
- private boolean printContent;
-
- private long ttl;
-
- private int priority;
-
- private String readyAddress;
-
- public JVMArgConfiguration()
- {
-
- url = System.getProperty("url",url);
- host = System.getProperty("host","");
- port = Integer.getInteger("port", -1);
- address = System.getProperty("address",address);
-
- timeout = Long.getLong("timeout",0);
- msg_size = Integer.getInteger("msg-size", 0);
- cacheMessage = true; //Boolean.getBoolean("cache-msg");
- disableMessageID = Boolean.getBoolean("disable-message-id");
- disableTimestamp = Boolean.getBoolean("disable-timestamp");
- durable = Boolean.getBoolean("durable");
- transaction_size = Integer.getInteger("tx",1000);
- ack_mode = Integer.getInteger("ack-mode",Session.AUTO_ACKNOWLEDGE);
- msg_count = Integer.getInteger("msg-count",msg_count);
- warmup_count = Integer.getInteger("warmup-count",warmup_count);
- random_msg_size = Boolean.getBoolean("random-msg-size");
- msgType = System.getProperty("msg-type","bytes");
- printStdDev = Boolean.getBoolean("print-std-dev");
- sendRate = Integer.getInteger("rate",0);
- externalController = Boolean.getBoolean("ext-controller");
- useUniqueDest = Boolean.getBoolean("use-unique-dest");
- random_msg_size_start_from = Integer.getInteger("random-msg-size-start-from", 1);
- reportEvery = Integer.getInteger("report-every",0);
- isReportTotal = Boolean.getBoolean("report-total");
- isReportHeader = (System.getProperty("report-header") == null) ? true : Boolean.getBoolean("report-header");
- sendEOS = Integer.getInteger("send-eos",1);
- connectionCount = Integer.getInteger("con_count",1);
- ackFrequency = Integer.getInteger("ack-frequency",100);
- rollbackFrequency = Integer.getInteger("rollback-frequency",0);
- printHeaders = Boolean.getBoolean("print-headers");
- printContent = Boolean.getBoolean("print-content");
- ttl = Long.getLong("ttl", 0);
- priority = Integer.getInteger("priority", 0);
- readyAddress = System.getProperty("ready-address");
- }
-
- /* (non-Javadoc)
- * @see org.apache.qpid.tools.TestConfiguration#getUrl()
- */
- @Override
- public String getUrl()
- {
- return url;
- }
-
- /* (non-Javadoc)
- * @see org.apache.qpid.tools.TestConfiguration#getHost()
- */
- @Override
- public String getHost()
- {
- return host;
- }
-
- /* (non-Javadoc)
- * @see org.apache.qpid.tools.TestConfiguration#getPort()
- */
- @Override
- public int getPort()
- {
- return port;
- }
-
- /* (non-Javadoc)
- * @see org.apache.qpid.tools.TestConfiguration#getAddress()
- */
- @Override
- public String getAddress()
- {
- return address;
- }
-
- /* (non-Javadoc)
- * @see org.apache.qpid.tools.TestConfiguration#getTimeout()
- */
- @Override
- public long getTimeout()
- {
- return timeout;
- }
-
- /* (non-Javadoc)
- * @see org.apache.qpid.tools.TestConfiguration#getAckMode()
- */
- @Override
- public int getAckMode()
- {
- return ack_mode;
- }
-
- /* (non-Javadoc)
- * @see org.apache.qpid.tools.TestConfiguration#getMsgCount()
- */
- @Override
- public int getMsgCount()
- {
- return msg_count;
- }
-
- /* (non-Javadoc)
- * @see org.apache.qpid.tools.TestConfiguration#getMsgSize()
- */
- @Override
- public int getMsgSize()
- {
- return msg_size;
- }
-
- /* (non-Javadoc)
- * @see org.apache.qpid.tools.TestConfiguration#getRandomMsgSizeStartFrom()
- */
- @Override
- public int getRandomMsgSizeStartFrom()
- {
- return random_msg_size_start_from;
- }
-
- /* (non-Javadoc)
- * @see org.apache.qpid.tools.TestConfiguration#isDurable()
- */
- @Override
- public boolean isDurable()
- {
- return durable;
- }
-
- /* (non-Javadoc)
- * @see org.apache.qpid.tools.TestConfiguration#isTransacted()
- */
- @Override
- public boolean isTransacted()
- {
- return transaction_size > 0;
- }
-
- /* (non-Javadoc)
- * @see org.apache.qpid.tools.TestConfiguration#getTransactionSize()
- */
- @Override
- public int getTransactionSize()
- {
- return transaction_size;
- }
-
- /* (non-Javadoc)
- * @see org.apache.qpid.tools.TestConfiguration#getWarmupCount()
- */
- @Override
- public int getWarmupCount()
- {
- return warmup_count;
- }
-
- /* (non-Javadoc)
- * @see org.apache.qpid.tools.TestConfiguration#isCacheMessage()
- */
- @Override
- public boolean isCacheMessage()
- {
- return cacheMessage;
- }
-
- /* (non-Javadoc)
- * @see org.apache.qpid.tools.TestConfiguration#isDisableMessageID()
- */
- @Override
- public boolean isDisableMessageID()
- {
- return disableMessageID;
- }
-
- /* (non-Javadoc)
- * @see org.apache.qpid.tools.TestConfiguration#isDisableTimestamp()
- */
- @Override
- public boolean isDisableTimestamp()
- {
- return disableTimestamp;
- }
-
- /* (non-Javadoc)
- * @see org.apache.qpid.tools.TestConfiguration#isRandomMsgSize()
- */
- @Override
- public boolean isRandomMsgSize()
- {
- return random_msg_size;
- }
-
- /* (non-Javadoc)
- * @see org.apache.qpid.tools.TestConfiguration#getMessageType()
- */
- @Override
- public String getMessageType()
- {
- return msgType;
- }
-
- /* (non-Javadoc)
- * @see org.apache.qpid.tools.TestConfiguration#isPrintStdDev()
- */
- @Override
- public boolean isPrintStdDev()
- {
- return printStdDev;
- }
-
- /* (non-Javadoc)
- * @see org.apache.qpid.tools.TestConfiguration#getSendRate()
- */
- @Override
- public int getSendRate()
- {
- return sendRate;
- }
-
- /* (non-Javadoc)
- * @see org.apache.qpid.tools.TestConfiguration#isExternalController()
- */
- @Override
- public boolean isExternalController()
- {
- return externalController;
- }
-
- public void setAddress(String addr)
- {
- address = addr;
- }
-
- /* (non-Javadoc)
- * @see org.apache.qpid.tools.TestConfiguration#isUseUniqueDests()
- */
- @Override
- public boolean isUseUniqueDests()
- {
- return useUniqueDest;
- }
-
- /* (non-Javadoc)
- * @see org.apache.qpid.tools.TestConfiguration#getAckFrequency()
- */
- @Override
- public int getAckFrequency()
- {
- return ackFrequency;
- }
-
- /* (non-Javadoc)
- * @see org.apache.qpid.tools.TestConfiguration#createConnection()
- */
- @Override
- public Connection createConnection() throws Exception
- {
- if (getHost().equals("") || getPort() == -1)
- {
- return new AMQConnection(getUrl());
- }
- else
- {
- return new AMQConnection(getHost(),getPort(),"guest","guest","test","test");
- }
- }
-
- /* (non-Javadoc)
- * @see org.apache.qpid.tools.TestConfiguration#getDecimalFormat()
- */
- @Override
- public DecimalFormat getDecimalFormat()
- {
- return df;
- }
-
- @Override
- public int reportEvery()
- {
- return reportEvery;
- }
-
- @Override
- public boolean isReportTotal()
- {
- return isReportTotal;
- }
-
- @Override
- public boolean isReportHeader()
- {
- return isReportHeader;
- }
-
- @Override
- public int getSendEOS()
- {
- return sendEOS;
- }
-
- @Override
- public int getConnectionCount()
- {
- return connectionCount;
- }
-
- @Override
- public int getRollbackFrequency()
- {
- return rollbackFrequency;
- }
-
- @Override
- public boolean isPrintHeaders()
- {
- return printHeaders;
- }
-
- @Override
- public boolean isPrintContent()
- {
- return printContent;
- }
-
- @Override
- public long getTTL()
- {
- return ttl;
- }
-
- @Override
- public int getPriority()
- {
- return priority;
- }
-
- @Override
- public String getReadyAddress()
- {
- return readyAddress;
- }
-}
diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/MercuryBase.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/MercuryBase.java
deleted file mode 100644
index 7ceef47573..0000000000
--- a/qpid/java/tools/src/main/java/org/apache/qpid/tools/MercuryBase.java
+++ /dev/null
@@ -1,190 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.tools;
-
-import java.net.InetAddress;
-import java.util.UUID;
-
-import javax.jms.Connection;
-import javax.jms.Destination;
-import javax.jms.MapMessage;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-
-import org.apache.qpid.client.AMQDestination;
-import org.apache.qpid.client.AMQSession_0_10;
-import org.apache.qpid.messaging.Address;
-import org.apache.qpid.tools.TestConfiguration.MessageType;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class MercuryBase
-{
- private static final Logger _logger = LoggerFactory.getLogger(MercuryBase.class);
-
- public final static String CODE = "CODE";
- public final static String ID = "ID";
- public final static String REPLY_ADDR = "REPLY_ADDR";
- public final static String MAX_LATENCY = "MAX_LATENCY";
- public final static String MIN_LATENCY = "MIN_LATENCY";
- public final static String AVG_LATENCY = "AVG_LATENCY";
- public final static String STD_DEV = "STD_DEV";
- public final static String CONS_RATE = "CONS_RATE";
- public final static String PROD_RATE = "PROD_RATE";
- public final static String MSG_COUNT = "MSG_COUNT";
- public final static String TIMESTAMP = "Timestamp";
-
- String CONTROLLER_ADDR = System.getProperty("CONT_ADDR","CONTROLLER;{create: always, node:{x-declare:{auto-delete:true}}}");
-
- TestConfiguration config;
- Connection con;
- Session session;
- Session controllerSession;
- Destination dest;
- Destination myControlQueue;
- Destination controllerQueue;
- String id;
- String myControlQueueAddr;
-
- MessageProducer sendToController;
- MessageConsumer receiveFromController;
- String prefix = "";
-
- enum OPCode
- {
- REGISTER_CONSUMER, REGISTER_PRODUCER,
- PRODUCER_STARTWARMUP, CONSUMER_STARTWARMUP,
- CONSUMER_READY, PRODUCER_READY,
- PRODUCER_START,
- RECEIVED_END_MSG, CONSUMER_STOP,
- RECEIVED_PRODUCER_STATS, RECEIVED_CONSUMER_STATS,
- CONTINUE_TEST, STOP_TEST
- };
-
- MessageType msgType = MessageType.BYTES;
-
- public MercuryBase(TestConfiguration config,String prefix)
- {
- this.config = config;
- String host = "";
- try
- {
- host = InetAddress.getLocalHost().getHostName();
- }
- catch (Exception e)
- {
- }
- id = host + "-" + UUID.randomUUID().toString();
- this.prefix = prefix;
- this.myControlQueueAddr = id + ";{create: always}";
- }
-
- public void setUp() throws Exception
- {
- con = config.createConnection();
- con.start();
-
- controllerSession = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- dest = createDestination();
- controllerQueue = AMQDestination.createDestination(CONTROLLER_ADDR, false);
- myControlQueue = session.createQueue(myControlQueueAddr);
- msgType = MessageType.getType(config.getMessageType());
- _logger.debug("Using " + msgType + " messages");
-
- sendToController = controllerSession.createProducer(controllerQueue);
- receiveFromController = controllerSession.createConsumer(myControlQueue);
- }
-
- private Destination createDestination() throws Exception
- {
- if (config.isUseUniqueDests())
- {
- _logger.debug("Prefix : " + prefix);
- Address addr = Address.parse(config.getAddress());
- AMQDestination temp = (AMQDestination) AMQDestination.createDestination(config.getAddress(), false);
- int type = ((AMQSession_0_10)session).resolveAddressType(temp);
-
- if ( type == AMQDestination.TOPIC_TYPE)
- {
- addr = new Address(addr.getName(),addr.getSubject() + "." + prefix,addr.getOptions());
- System.out.println("Setting subject : " + addr);
- }
- else
- {
- addr = new Address(addr.getName() + "_" + prefix,addr.getSubject(),addr.getOptions());
- System.out.println("Setting name : " + addr);
- }
-
- return AMQDestination.createDestination(addr.toString(), false);
- }
- else
- {
- return AMQDestination.createDestination(config.getAddress(), false);
- }
- }
-
- public synchronized void sendMessageToController(MapMessage m) throws Exception
- {
- m.setString(ID, id);
- m.setString(REPLY_ADDR,myControlQueueAddr);
- sendToController.send(m);
- }
-
- public void receiveFromController(OPCode expected) throws Exception
- {
- MapMessage m = (MapMessage)receiveFromController.receive();
- OPCode code = OPCode.values()[m.getInt(CODE)];
- _logger.debug("Received Code : " + code);
- if (expected != code)
- {
- throw new Exception("Expected OPCode : " + expected + " but received : " + code);
- }
-
- }
-
- public boolean continueTest() throws Exception
- {
- MapMessage m = (MapMessage)receiveFromController.receive();
- OPCode code = OPCode.values()[m.getInt(CODE)];
- _logger.debug("Received Code : " + code);
- return (code == OPCode.CONTINUE_TEST);
- }
-
- public void tearDown() throws Exception
- {
- session.close();
- controllerSession.close();
- con.close();
- }
-
- public void handleError(Exception e,String msg)
- {
- StringBuilder sb = new StringBuilder();
- sb.append(msg);
- sb.append(" ");
- sb.append(e.getMessage());
- System.err.println(sb.toString());
- e.printStackTrace();
- }
-}
-
diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/MercuryConsumerController.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/MercuryConsumerController.java
deleted file mode 100644
index b35adc45d6..0000000000
--- a/qpid/java/tools/src/main/java/org/apache/qpid/tools/MercuryConsumerController.java
+++ /dev/null
@@ -1,231 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.tools;
-
-import java.util.concurrent.CountDownLatch;
-
-import javax.jms.MapMessage;
-
-import org.apache.qpid.thread.Threading;
-import org.apache.qpid.tools.report.MercuryReporter;
-import org.apache.qpid.tools.report.MercuryReporter.MercuryThroughputAndLatency;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * PerfConsumer will receive x no of messages in warmup mode.
- * Once it receives the Start message it will then signal the PerfProducer.
- * It will start recording stats from the first message it receives after
- * the warmup mode is done.
- *
- * The following calculations are done.
- * The important numbers to look at is
- * a) Avg Latency
- * b) System throughput.
- *
- * Latency.
- * =========
- * Currently this test is written with the assumption that either
- * a) The Perf Producer and Consumer are on the same machine
- * b) They are on separate machines that have their time synced via a Time Server
- *
- * In order to calculate latency the producer inserts a timestamp
- * when the message is sent. The consumer will note the current time the message is
- * received and will calculate the latency as follows
- * latency = rcvdTime - msg.getJMSTimestamp()
- *
- * Through out the test it will keep track of the max and min latency to show the
- * variance in latencies.
- *
- * Avg latency is measured by adding all latencies and dividing by the total msgs.
- *
- * Throughput
- * ===========
- * Consumer rate is calculated as
- * rcvdMsgCount/(rcvdTime - startTime)
- *
- * Note that the testStartTime referes to when the producer sent the first message
- * and startTime is when the consumer first received a message.
- *
- * rcvdTime keeps track of when the last message is received.
- *
- * All throughput rates are given as msg/sec so the rates are multiplied by 1000.
- *
- */
-
-public class MercuryConsumerController extends MercuryBase
-{
- private static final Logger _logger = LoggerFactory.getLogger(MercuryConsumerController.class);
- MercuryReporter reporter;
- TestConfiguration config;
- QpidReceive receiver;
-
- public MercuryConsumerController(TestConfiguration config, MercuryReporter reporter, String prefix)
- {
- super(config,prefix);
- this.reporter = reporter;
- if (_logger.isInfoEnabled())
- {
- _logger.info("Consumer ID : " + id);
- }
- }
-
- public void setUp() throws Exception
- {
- super.setUp();
- receiver = new QpidReceive(reporter,config, con,dest);
- receiver.setUp();
- MapMessage m = controllerSession.createMapMessage();
- m.setInt(CODE, OPCode.REGISTER_CONSUMER.ordinal());
- sendMessageToController(m);
- }
-
- public void warmup()throws Exception
- {
- receiveFromController(OPCode.CONSUMER_STARTWARMUP);
- receiver.waitforCompletion(config.getWarmupCount());
-
- // It's more realistic for the consumer to signal this.
- MapMessage m1 = controllerSession.createMapMessage();
- m1.setInt(CODE, OPCode.PRODUCER_READY.ordinal());
- sendMessageToController(m1);
-
- MapMessage m2 = controllerSession.createMapMessage();
- m2.setInt(CODE, OPCode.CONSUMER_READY.ordinal());
- sendMessageToController(m2);
- }
-
- public void runReceiver() throws Exception
- {
- if (_logger.isInfoEnabled())
- {
- _logger.info("Consumer: " + id + " Starting iteration......" + "\n");
- }
- resetCounters();
- receiver.waitforCompletion(config.getMsgCount());
- MapMessage m = controllerSession.createMapMessage();
- m.setInt(CODE, OPCode.RECEIVED_END_MSG.ordinal());
- sendMessageToController(m);
- }
-
- public void resetCounters()
- {
- reporter.clear();
- }
-
- public void sendResults() throws Exception
- {
- receiveFromController(OPCode.CONSUMER_STOP);
- reporter.report();
-
- MapMessage m = controllerSession.createMapMessage();
- m.setInt(CODE, OPCode.RECEIVED_CONSUMER_STATS.ordinal());
- m.setDouble(AVG_LATENCY, reporter.getAvgLatency());
- m.setDouble(MIN_LATENCY, reporter.getMinLatency());
- m.setDouble(MAX_LATENCY, reporter.getMaxLatency());
- m.setDouble(STD_DEV, reporter.getStdDev());
- m.setDouble(CONS_RATE, reporter.getRate());
- m.setLong(MSG_COUNT, reporter.getSampleSize());
- sendMessageToController(m);
-
- reporter.log(new StringBuilder("Total Msgs Received : ").append(reporter.getSampleSize()).toString());
- reporter.log(new StringBuilder("Consumer rate : ").
- append(config.getDecimalFormat().format(reporter.getRate())).
- append(" msg/sec").toString());
- reporter.log(new StringBuilder("Avg Latency : ").
- append(config.getDecimalFormat().format(reporter.getAvgLatency())).
- append(" ms").toString());
- reporter.log(new StringBuilder("Min Latency : ").
- append(config.getDecimalFormat().format(reporter.getMinLatency())).
- append(" ms").toString());
- reporter.log(new StringBuilder("Max Latency : ").
- append(config.getDecimalFormat().format(reporter.getMaxLatency())).
- append(" ms").toString());
- if (config.isPrintStdDev())
- {
- reporter.log(new StringBuilder("Std Dev : ").
- append(reporter.getStdDev()).toString());
- }
- }
-
- public void run()
- {
- try
- {
- setUp();
- warmup();
- boolean nextIteration = true;
- while (nextIteration)
- {
- System.out.println("=========================================================\n");
- System.out.println("Consumer: " + id + " starting a new iteration ......\n");
- runReceiver();
- sendResults();
- nextIteration = continueTest();
- }
- tearDown();
- }
- catch(Exception e)
- {
- handleError(e,"Error when running test");
- }
- }
-
- @Override
- public void tearDown() throws Exception
- {
- super.tearDown();
- }
-
- public static void main(String[] args) throws Exception
- {
- TestConfiguration config = new JVMArgConfiguration();
- MercuryReporter reporter= new MercuryReporter(MercuryThroughputAndLatency.class,System.out,10,true);
- String scriptId = (args.length == 1) ? args[0] : "";
- int conCount = config.getConnectionCount();
- final CountDownLatch testCompleted = new CountDownLatch(conCount);
- for (int i=0; i < conCount; i++)
- {
- final MercuryConsumerController cons = new MercuryConsumerController(config, reporter, scriptId + i);
- Runnable r = new Runnable()
- {
- public void run()
- {
- cons.run();
- testCompleted.countDown();
- }
- };
-
- Thread t;
- try
- {
- t = Threading.getThreadFactory().createThread(r);
- }
- catch(Exception e)
- {
- throw new Error("Error creating consumer thread",e);
- }
- t.start();
- }
- testCompleted.await();
- reporter.log("Consumers have completed the test......\n");
- }
-} \ No newline at end of file
diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/MercuryProducerController.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/MercuryProducerController.java
deleted file mode 100644
index 02377bb853..0000000000
--- a/qpid/java/tools/src/main/java/org/apache/qpid/tools/MercuryProducerController.java
+++ /dev/null
@@ -1,210 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.tools;
-
-import java.util.concurrent.CountDownLatch;
-
-import javax.jms.MapMessage;
-
-import org.apache.qpid.thread.Threading;
-import org.apache.qpid.tools.report.MercuryReporter;
-import org.apache.qpid.tools.report.MercuryReporter.MercuryThroughput;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * PerfProducer sends an x no of messages in warmup mode and wait for a confirmation
- * from the consumer that it has successfully consumed them and ready to start the
- * test. It will start sending y no of messages and each message will contain a time
- * stamp. This will be used at the receiving end to measure the latency.
- *
- * This is done with the assumption that both consumer and producer are running on
- * the same machine or different machines which have time synced using a time server.
- *
- * This test also calculates the producer rate as follows.
- * rate = msg_count/(time_before_sending_msgs - time_after_sending_msgs)
- *
- * All throughput rates are given as msg/sec so the rates are multiplied by 1000.
- *
- * Rajith - Producer rate is not an accurate perf metric IMO.
- * It is heavily inlfuenced by any in memory buffering.
- * System throughput and latencies calculated by the PerfConsumer are more realistic
- * numbers.
- *
- * Answer by rajith : I agree about in memory buffering affecting rates. But Based on test runs
- * I have done so far, it seems quite useful to compute the producer rate as it gives an
- * indication of how the system behaves. For ex if there is a gap between producer and consumer rates
- * you could clearly see the higher latencies and when producer and consumer rates are very close,
- * latency is good.
- *
- */
-public class MercuryProducerController extends MercuryBase
-{
- private static final Logger _logger = LoggerFactory.getLogger(MercuryProducerController.class);
- MercuryReporter reporter;
- QpidSend sender;
-
- public MercuryProducerController(TestConfiguration config, MercuryReporter reporter, String prefix)
- {
- super(config,prefix);
- this.reporter = reporter;
- System.out.println("Producer ID : " + id);
- }
-
- public void setUp() throws Exception
- {
- super.setUp();
- sender = new QpidSend(reporter,config, con,dest);
- sender.setUp();
- MapMessage m = controllerSession.createMapMessage();
- m.setInt(CODE, OPCode.REGISTER_PRODUCER.ordinal());
- sendMessageToController(m);
- }
-
- public void warmup()throws Exception
- {
- receiveFromController(OPCode.PRODUCER_STARTWARMUP);
- if (_logger.isInfoEnabled())
- {
- _logger.info("Producer: " + id + " Warming up......");
- }
- sender.send(config.getWarmupCount());
- sender.sendEndMessage();
- }
-
- public void runSender() throws Exception
- {
- resetCounters();
- receiveFromController(OPCode.PRODUCER_START);
- sender.send(config.getMsgCount());
- }
-
- public void resetCounters()
- {
- sender.resetCounters();
- }
-
- public void sendResults() throws Exception
- {
- MapMessage msg = controllerSession.createMapMessage();
- msg.setInt(CODE, OPCode.RECEIVED_PRODUCER_STATS.ordinal());
- msg.setDouble(PROD_RATE, reporter.getRate());
- sendMessageToController(msg);
- reporter.log(new StringBuilder("Producer rate: ").
- append(config.getDecimalFormat().format(reporter.getRate())).
- append(" msg/sec").
- toString());
- }
-
- @Override
- public void tearDown() throws Exception
- {
- sender.tearDown();
- super.tearDown();
- }
-
- public void run()
- {
- try
- {
- setUp();
- warmup();
- boolean nextIteration = true;
- while (nextIteration)
- {
- if(_logger.isInfoEnabled())
- {
- _logger.info("=========================================================\n");
- _logger.info("Producer: " + id + " starting a new iteration ......\n");
- }
- runSender();
- sendResults();
- nextIteration = continueTest();
- }
- tearDown();
- }
- catch(Exception e)
- {
- handleError(e,"Error when running test");
- }
- }
-
- public void startControllerIfNeeded()
- {
- if (!config.isExternalController())
- {
- final MercuryTestController controller = new MercuryTestController(config);
- Runnable r = new Runnable()
- {
- public void run()
- {
- controller.run();
- }
- };
-
- Thread t;
- try
- {
- t = Threading.getThreadFactory().createThread(r);
- }
- catch(Exception e)
- {
- throw new Error("Error creating controller thread",e);
- }
- t.start();
- }
- }
-
- public static void main(String[] args) throws Exception
- {
- TestConfiguration config = new JVMArgConfiguration();
- MercuryReporter reporter= new MercuryReporter(MercuryThroughput.class,System.out,10,true);
- String scriptId = (args.length == 1) ? args[0] : "";
- int conCount = config.getConnectionCount();
- final CountDownLatch testCompleted = new CountDownLatch(conCount);
- for (int i=0; i < conCount; i++)
- {
- final MercuryProducerController prod = new MercuryProducerController(config, reporter, scriptId + i);
- prod.startControllerIfNeeded();
- Runnable r = new Runnable()
- {
- public void run()
- {
- prod.run();
- testCompleted.countDown();
- }
- };
-
- Thread t;
- try
- {
- t = Threading.getThreadFactory().createThread(r);
- }
- catch(Exception e)
- {
- throw new Error("Error creating producer thread",e);
- }
- t.start();
- }
- testCompleted.await();
- reporter.log("Producers have completed the test......");
- }
-} \ No newline at end of file
diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/MercuryTestController.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/MercuryTestController.java
deleted file mode 100644
index 8c66a1e44d..0000000000
--- a/qpid/java/tools/src/main/java/org/apache/qpid/tools/MercuryTestController.java
+++ /dev/null
@@ -1,450 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.tools;
-
-import java.io.FileWriter;
-import java.util.Collection;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CountDownLatch;
-
-import javax.jms.MapMessage;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-
-import org.apache.qpid.client.message.AMQPEncodedMapMessage;
-import org.apache.qpid.tools.report.Reporter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * The Controller coordinates a test run between a number
- * of producers and consumers, configured via -Dprod_count and -Dcons_count.
- *
- * It waits till all the producers and consumers have registered and then
- * conducts a warmup run. Once all consumers and producers have completed
- * the warmup run and is ready, it will conduct the actual test run and
- * collect all stats from the participants and calculates the system
- * throughput, the avg/min/max for producer rates, consumer rates and latency.
- *
- * These stats are then printed to std out.
- * The Controller also prints events to std out to give a running account
- * of the test run in progress. Ex registering of participants, starting warmup ..etc.
- * This allows a scripting tool to monitor the progress.
- *
- * The Controller can be run in two modes.
- * 1. A single test run (default) where it just runs until the message count specified
- * for the producers via -Dmsg_count is sent and received.
- *
- * 2. Time based, configured via -Dduration=x, where x is in mins.
- * In this mode, the Controller repeatedly cycles through the tests (after an initial
- * warmup run) until the desired time is reached. If a test run is in progress
- * and the time is up, it will allow the run the complete.
- *
- * After each iteration, the stats will be printed out in csv format to a separate log file.
- * System throughput is calculated as follows
- * totalMsgCount/(totalTestTime)
- */
-public class MercuryTestController extends MercuryBase implements MessageListener
-{
- private static final Logger _logger = LoggerFactory.getLogger(MercuryProducerController.class);
-
- enum TestMode { SINGLE_RUN, TIME_BASED };
-
- TestMode testMode = TestMode.SINGLE_RUN;
-
- long totalTestTime;
-
- private double avgSystemLatency = 0.0;
- private double minSystemLatency = Double.MAX_VALUE;
- private double maxSystemLatency = 0;
- private double avgSystemLatencyStdDev = 0.0;
-
- private double avgSystemConsRate = 0.0;
- private double maxSystemConsRate = 0.0;
- private double minSystemConsRate = Double.MAX_VALUE;
-
- private double avgSystemProdRate = 0.0;
- private double maxSystemProdRate = 0.0;
- private double minSystemProdRate = Double.MAX_VALUE;
-
- private long totalMsgCount = 0;
- private double totalSystemThroughput = 0.0;
-
- private int consumerCount = Integer.getInteger("cons_count", 1);
- private int producerCount = Integer.getInteger("prod_count", 1);
- private int duration = Integer.getInteger("duration", -1); // in mins
- private Map<String,MapMessage> consumers;
- private Map<String,MapMessage> producers;
-
- private CountDownLatch consRegistered;
- private CountDownLatch prodRegistered;
- private CountDownLatch consReady;
- private CountDownLatch prodReady;
- private CountDownLatch receivedEndMsg;
- private CountDownLatch receivedConsStats;
- private CountDownLatch receivedProdStats;
-
- private MessageConsumer consumer;
- private boolean printStdDev = false;
- private FileWriter writer;
- private Reporter report;
-
- public MercuryTestController(TestConfiguration config)
- {
- super(config,"");
-
- consumers = new ConcurrentHashMap<String,MapMessage>(consumerCount);
- producers = new ConcurrentHashMap<String,MapMessage>(producerCount);
-
- consRegistered = new CountDownLatch(consumerCount);
- prodRegistered = new CountDownLatch(producerCount);
- consReady = new CountDownLatch(consumerCount);
- prodReady = new CountDownLatch(producerCount);
- printStdDev = config.isPrintStdDev();
- testMode = (duration == -1) ? TestMode.SINGLE_RUN : TestMode.TIME_BASED;
- }
-
- public void setUp() throws Exception
- {
- super.setUp();
- if (testMode == TestMode.TIME_BASED)
- {
- writer = new FileWriter("stats-csv.log");
- }
- consumer = controllerSession.createConsumer(controllerQueue);
- report.log("\nController: " + producerCount + " producers are expected");
- report.log("Controller: " + consumerCount + " consumers are expected \n");
- consumer.setMessageListener(this);
- consRegistered.await();
- prodRegistered.await();
- report.log("\nController: All producers and consumers have registered......\n");
- }
-
- public void warmup() throws Exception
- {
- report.log("Controller initiating warm up sequence......");
- sendMessageToNodes(OPCode.CONSUMER_STARTWARMUP,consumers.values());
- sendMessageToNodes(OPCode.PRODUCER_STARTWARMUP,producers.values());
- prodReady.await();
- consReady.await();
- report.log("\nController : All producers and consumers are ready to start the test......\n");
- }
-
- public void startTest() throws Exception
- {
- resetCounters();
- report.log("\nController Starting test......");
- long start = Clock.getTime();
- sendMessageToNodes(OPCode.PRODUCER_START,producers.values());
- receivedEndMsg.await();
- totalTestTime = Clock.getTime() - start;
- sendMessageToNodes(OPCode.CONSUMER_STOP,consumers.values());
- receivedProdStats.await();
- receivedConsStats.await();
- }
-
- public void resetCounters()
- {
- minSystemLatency = Double.MAX_VALUE;
- maxSystemLatency = 0;
- maxSystemConsRate = 0.0;
- minSystemConsRate = Double.MAX_VALUE;
- maxSystemProdRate = 0.0;
- minSystemProdRate = Double.MAX_VALUE;
-
- totalMsgCount = 0;
-
- receivedConsStats = new CountDownLatch(consumerCount);
- receivedProdStats = new CountDownLatch(producerCount);
- receivedEndMsg = new CountDownLatch(producerCount);
- }
-
- public void calcStats() throws Exception
- {
- double totLatency = 0.0;
- double totStdDev = 0.0;
- double totalConsRate = 0.0;
- double totalProdRate = 0.0;
-
- MapMessage conStat = null; // for error handling
- try
- {
- for (MapMessage m: consumers.values())
- {
- conStat = m;
- minSystemLatency = Math.min(minSystemLatency,m.getDouble(MIN_LATENCY));
- maxSystemLatency = Math.max(maxSystemLatency,m.getDouble(MAX_LATENCY));
- totLatency = totLatency + m.getDouble(AVG_LATENCY);
- totStdDev = totStdDev + m.getDouble(STD_DEV);
-
- minSystemConsRate = Math.min(minSystemConsRate,m.getDouble(CONS_RATE));
- maxSystemConsRate = Math.max(maxSystemConsRate,m.getDouble(CONS_RATE));
- totalConsRate = totalConsRate + m.getDouble(CONS_RATE);
-
- totalMsgCount = totalMsgCount + m.getLong(MSG_COUNT);
- }
- }
- catch(Exception e)
- {
- System.err.println("Error calculating stats from Consumer : " + conStat);
- }
-
-
- MapMessage prodStat = null; // for error handling
- try
- {
- for (MapMessage m: producers.values())
- {
- prodStat = m;
- minSystemProdRate = Math.min(minSystemProdRate,m.getDouble(PROD_RATE));
- maxSystemProdRate = Math.max(maxSystemProdRate,m.getDouble(PROD_RATE));
- totalProdRate = totalProdRate + m.getDouble(PROD_RATE);
- }
- }
- catch(Exception e)
- {
- System.err.println("Error calculating stats from Producer : " + conStat);
- }
-
- avgSystemLatency = totLatency/consumers.size();
- avgSystemLatencyStdDev = totStdDev/consumers.size();
- avgSystemConsRate = totalConsRate/consumers.size();
- avgSystemProdRate = totalProdRate/producers.size();
-
- report.log("Total test time : " + totalTestTime + " in " + Clock.getPrecision());
-
- totalSystemThroughput = (totalMsgCount*Clock.convertToSecs()/totalTestTime);
- }
-
- public void printResults() throws Exception
- {
- report.log(new StringBuilder("Total Msgs Received : ").append(totalMsgCount).toString());
- report.log(new StringBuilder("System Throughput : ").
- append(config.getDecimalFormat().format(totalSystemThroughput)).
- append(" msg/sec").toString());
- report.log(new StringBuilder("Avg Consumer rate : ").
- append(config.getDecimalFormat().format(avgSystemConsRate)).
- append(" msg/sec").toString());
- report.log(new StringBuilder("Min Consumer rate : ").
- append(config.getDecimalFormat().format(minSystemConsRate)).
- append(" msg/sec").toString());
- report.log(new StringBuilder("Max Consumer rate : ").
- append(config.getDecimalFormat().format(maxSystemConsRate)).
- append(" msg/sec").toString());
-
- report.log(new StringBuilder("Avg Producer rate : ").
- append(config.getDecimalFormat().format(avgSystemProdRate)).
- append(" msg/sec").toString());
- report.log(new StringBuilder("Min Producer rate : ").
- append(config.getDecimalFormat().format(minSystemProdRate)).
- append(" msg/sec").toString());
- report.log(new StringBuilder("Max Producer rate : ").
- append(config.getDecimalFormat().format(maxSystemProdRate)).
- append(" msg/sec").toString());
-
- report.log(new StringBuilder("Avg System Latency : ").
- append(config.getDecimalFormat().format(avgSystemLatency)).
- append(" ms").toString());
- report.log(new StringBuilder("Min System Latency : ").
- append(config.getDecimalFormat().format(minSystemLatency)).
- append(" ms").toString());
- report.log(new StringBuilder("Max System Latency : ").
- append(config.getDecimalFormat().format(maxSystemLatency)).
- append(" ms").toString());
- if (printStdDev)
- {
- report.log(new StringBuilder("Avg System Std Dev : ").
- append(avgSystemLatencyStdDev).toString());
- }
- }
-
- private synchronized void sendMessageToNodes(OPCode code,Collection<MapMessage> nodes) throws Exception
- {
- report.log("\nController: Sending code " + code);
- MessageProducer tmpProd = controllerSession.createProducer(null);
- MapMessage msg = controllerSession.createMapMessage();
- msg.setInt(CODE, code.ordinal());
- for (MapMessage node : nodes)
- {
- if (node.getString(REPLY_ADDR) == null)
- {
- report.log("REPLY_ADDR is null " + node);
- }
- else
- {
- report.log("Controller: Sending " + code + " to " + node.getString(REPLY_ADDR));
- }
- tmpProd.send(controllerSession.createQueue(node.getString(REPLY_ADDR)), msg);
- }
- }
-
- public void onMessage(Message msg)
- {
- try
- {
- MapMessage m = (MapMessage)msg;
- OPCode code = OPCode.values()[m.getInt(CODE)];
-
- report.log("\n---------Controller Received Code : " + code);
- report.log("---------Data : " + ((AMQPEncodedMapMessage)m).getMap());
-
- switch (code)
- {
- case REGISTER_CONSUMER :
- if (consRegistered.getCount() == 0)
- {
- report.log("Warning : Expected number of consumers have already registered," +
- "ignoring extra consumer");
- break;
- }
- consumers.put(m.getString(ID),m);
- consRegistered.countDown();
- break;
-
- case REGISTER_PRODUCER :
- if (prodRegistered.getCount() == 0)
- {
- report.log("Warning : Expected number of producers have already registered," +
- "ignoring extra producer");
- break;
- }
- producers.put(m.getString(ID),m);
- prodRegistered.countDown();
- break;
-
- case CONSUMER_READY :
- consReady.countDown();
- break;
-
- case PRODUCER_READY :
- prodReady.countDown();
- break;
-
- case RECEIVED_END_MSG :
- receivedEndMsg.countDown();
- break;
-
- case RECEIVED_CONSUMER_STATS :
- consumers.put(m.getString(ID),m);
- receivedConsStats.countDown();
- break;
-
- case RECEIVED_PRODUCER_STATS :
- producers.put(m.getString(ID),m);
- receivedProdStats.countDown();
- break;
-
- default:
- throw new Exception("Invalid OPCode " + code);
- }
- }
- catch (Exception e)
- {
- handleError(e,"Error when receiving messages " + msg);
- }
- }
-
- public void run()
- {
- try
- {
- setUp();
- warmup();
- if (testMode == TestMode.SINGLE_RUN)
- {
- startTest();
- calcStats();
- printResults();
- }
- else
- {
- long startTime = Clock.getTime();
- long timeLimit = duration * 60 * 1000; // duration is in mins.
- boolean nextIteration = true;
- while (nextIteration)
- {
- startTest();
- calcStats();
- writeStatsToFile();
- if (Clock.getTime() - startTime < timeLimit)
- {
- sendMessageToNodes(OPCode.CONTINUE_TEST,consumers.values());
- sendMessageToNodes(OPCode.CONTINUE_TEST,producers.values());
- nextIteration = true;
- }
- else
- {
- nextIteration = false;
- }
- }
- }
- tearDown();
-
- }
- catch(Exception e)
- {
- handleError(e,"Error when running test");
- }
- }
-
- @Override
- public void tearDown() throws Exception {
- report.log("Controller: Completed the test......\n");
- if (testMode == TestMode.TIME_BASED)
- {
- writer.close();
- }
- sendMessageToNodes(OPCode.STOP_TEST,consumers.values());
- sendMessageToNodes(OPCode.STOP_TEST,producers.values());
- super.tearDown();
- }
-
- public void writeStatsToFile() throws Exception
- {
- writer.append(String.valueOf(totalMsgCount)).append(",");
- writer.append(config.getDecimalFormat().format(totalSystemThroughput)).append(",");
- writer.append(config.getDecimalFormat().format(avgSystemConsRate)).append(",");
- writer.append(config.getDecimalFormat().format(minSystemConsRate)).append(",");
- writer.append(config.getDecimalFormat().format(maxSystemConsRate)).append(",");
- writer.append(config.getDecimalFormat().format(avgSystemProdRate)).append(",");
- writer.append(config.getDecimalFormat().format(minSystemProdRate)).append(",");
- writer.append(config.getDecimalFormat().format(maxSystemProdRate)).append(",");
- writer.append(config.getDecimalFormat().format(avgSystemLatency)).append(",");
- writer.append(config.getDecimalFormat().format(minSystemLatency)).append(",");
- writer.append(config.getDecimalFormat().format(maxSystemLatency));
- if (printStdDev)
- {
- writer.append(",").append(String.valueOf(avgSystemLatencyStdDev));
- }
- writer.append("\n");
- writer.flush();
- }
-
- public static void main(String[] args)
- {
- TestConfiguration config = new JVMArgConfiguration();
- MercuryTestController controller = new MercuryTestController(config);
- controller.run();
- }
-}
diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/MessageFactory.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/MessageFactory.java
deleted file mode 100644
index a0ba928e1f..0000000000
--- a/qpid/java/tools/src/main/java/org/apache/qpid/tools/MessageFactory.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.tools;
-
-
-import javax.jms.BytesMessage;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-
-public class MessageFactory
-{
- public static Message createBytesMessage(Session ssn, int size) throws JMSException
- {
- BytesMessage msg = ssn.createBytesMessage();
- msg.writeBytes(createMessagePayload(size).getBytes());
- return msg;
- }
-
- public static Message createTextMessage(Session ssn, int size) throws JMSException
- {
- TextMessage msg = ssn.createTextMessage();
- msg.setText(createMessagePayload(size));
- return msg;
- }
-
- public static String createMessagePayload(int size)
- {
- String msgData = "Qpid Test Message";
-
- StringBuffer buf = new StringBuffer(size);
- int count = 0;
- while (count <= (size - msgData.length()))
- {
- buf.append(msgData);
- count += msgData.length();
- }
- if (count < size)
- {
- buf.append(msgData, 0, size - count);
- }
-
- return buf.toString();
- }
-}
diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidBench.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidBench.java
deleted file mode 100644
index e2d179965b..0000000000
--- a/qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidBench.java
+++ /dev/null
@@ -1,904 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.tools;
-
-import static org.apache.qpid.tools.QpidBench.Mode.BOTH;
-import static org.apache.qpid.tools.QpidBench.Mode.CONSUME;
-import static org.apache.qpid.tools.QpidBench.Mode.PUBLISH;
-
-import java.lang.reflect.Field;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-
-import javax.jms.DeliveryMode;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.TextMessage;
-
-import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.thread.Threading;
-import org.apache.qpid.transport.DeliveryProperties;
-import org.apache.qpid.transport.ExchangeBind;
-import org.apache.qpid.transport.Header;
-import org.apache.qpid.transport.MessageAcceptMode;
-import org.apache.qpid.transport.MessageAcquireMode;
-import org.apache.qpid.transport.MessageCreditUnit;
-import org.apache.qpid.transport.MessageDeliveryMode;
-import org.apache.qpid.transport.MessageFlowMode;
-import org.apache.qpid.transport.MessageProperties;
-import org.apache.qpid.transport.MessageSubscribe;
-import org.apache.qpid.transport.MessageTransfer;
-import org.apache.qpid.transport.QueueDeclare;
-import org.apache.qpid.transport.SessionException;
-import org.apache.qpid.transport.SessionListener;
-import org.apache.qpid.util.UUIDGen;
-import org.apache.qpid.util.UUIDs;
-
-/**
- * QpidBench
- *
- */
-
-public class QpidBench
-{
-
- static enum Mode
- {
- PUBLISH, CONSUME, BOTH
- }
-
- private static class Options
- {
- private StringBuilder usage = new StringBuilder("qpid-bench <options>");
-
- void usage(String name, String description, Object def)
- {
- String defval = "";
- if (def != null)
- {
- defval = String.format(" (%s)", def);
- }
- usage.append(String.format("\n %-15s%-14s %s", name, defval, description));
- }
-
- public String broker = "localhost";
- public int port = 5672;
- public long count = 1000000;
- public long window = 100000;
- public long sample = window;
- public int size = 1024;
- public Mode mode = BOTH;
- public boolean timestamp = false;
- public boolean message_id = false;
- public boolean message_cache = false;
- public boolean persistent = false;
- public boolean jms_publish = false;
- public boolean jms_consume = false;
- public boolean help = false;
-
- {
- usage("-b, --broker", "the broker hostname", broker);
- }
-
- public void parse__broker(String b)
- {
- this.broker = b;
- }
-
- public void parse_b(String b)
- {
- parse__broker(b);
- }
-
- {
- usage("-p, --port", "the broker port", port);
- }
-
- public void parse__port(String p)
- {
- this.port = Integer.parseInt(p);
- }
-
- public void parse_p(String p)
- {
- parse__port(p);
- }
-
- {
- usage("-c, --count", "the number of messages to send/receive, 0 means no limit", count);
- }
-
- public void parse__count(String c)
- {
- this.count = Long.parseLong(c);
- }
-
- public void parse_c(String c)
- {
- parse__count(c);
- }
-
- {
- usage("-w, --window", "the number of messages to send before blocking, 0 disables", window);
- }
-
- public void parse__window(String w)
- {
- this.window = Long.parseLong(w);
- }
-
- public void parse_w(String w)
- {
- parse__window(w);
- }
-
- {
- usage("--sample", "print stats after this many messages, 0 disables", sample);
- }
-
- public void parse__sample(String s)
- {
- this.sample = Long.parseLong(s);
- }
-
- {
- usage("-i, --interval", "sets both --window and --sample", window);
- }
-
- public void parse__interval(String i)
- {
- this.window = Long.parseLong(i);
- this.sample = window;
- }
-
- public void parse_i(String i)
- {
- parse__interval(i);
- }
-
- {
- usage("-s, --size", "the message size", size);
- }
-
- public void parse__size(String s)
- {
- this.size = Integer.parseInt(s);
- }
-
- public void parse_s(String s)
- {
- parse__size(s);
- }
-
- {
- usage("-m, --mode", "one of publish, consume, or both", mode);
- }
-
- public void parse__mode(String m)
- {
- if (m.equalsIgnoreCase("publish"))
- {
- this.mode = PUBLISH;
- }
- else if (m.equalsIgnoreCase("consume"))
- {
- this.mode = CONSUME;
- }
- else if (m.equalsIgnoreCase("both"))
- {
- this.mode = BOTH;
- }
- else
- {
- throw new IllegalArgumentException
- ("must be one of 'publish', 'consume', or 'both'");
- }
- }
-
- public void parse_m(String m)
- {
- parse__mode(m);
- }
-
- {
- usage("--timestamp", "set timestamps on each message if true", timestamp);
- }
-
- public void parse__timestamp(String t)
- {
- this.timestamp = Boolean.parseBoolean(t);
- }
-
- {
- usage("--mesage-id", "set the message-id on each message if true", message_id);
- }
-
- public void parse__message_id(String m)
- {
- this.message_id = Boolean.parseBoolean(m);
- }
-
- {
- usage("--message-cache", "reuse the same message for each send if true", message_cache);
- }
-
- public void parse__message_cache(String c)
- {
- this.message_cache = Boolean.parseBoolean(c);
- }
-
- {
- usage("--persistent", "set the delivery-mode to persistent if true", persistent);
- }
-
- public void parse__persistent(String p)
- {
- this.persistent = Boolean.parseBoolean(p);
- }
-
- {
- usage("--jms-publish", "use the jms client for publish", jms_publish);
- }
-
- public void parse__jms_publish(String jp)
- {
- this.jms_publish = Boolean.parseBoolean(jp);
- }
-
- {
- usage("--jms-consume", "use the jms client for consume", jms_consume);
- }
-
- public void parse__jms_consume(String jc)
- {
- this.jms_consume = Boolean.parseBoolean(jc);
- }
-
- {
- usage("--jms", "sets both --jms-publish and --jms-consume", false);
- }
-
- public void parse__jms(String j)
- {
- this.jms_publish = this.jms_consume = Boolean.parseBoolean(j);
- }
-
- {
- usage("-h, --help", "prints this message", null);
- }
-
- public void parse__help()
- {
- this.help = true;
- }
-
- public void parse_h()
- {
- parse__help();
- }
-
- public String parse(String ... args)
- {
- Class klass = getClass();
- List<String> arguments = new ArrayList<String>();
- for (int i = 0; i < args.length; i++)
- {
- String option = args[i];
-
- if (!option.startsWith("-"))
- {
- arguments.add(option);
- continue;
- }
-
- String method = "parse" + option.replace('-', '_');
- try
- {
- try
- {
- Method parser = klass.getMethod(method);
- parser.invoke(this);
- }
- catch (NoSuchMethodException e)
- {
- try
- {
- Method parser = klass.getMethod(method, String.class);
-
- String value = null;
- if (i + 1 < args.length)
- {
- value = args[i+1];
- i++;
- }
- else
- {
- return option + " requires a value";
- }
-
- parser.invoke(this, value);
- }
- catch (NoSuchMethodException e2)
- {
- return "no such option: " + option;
- }
- }
- }
- catch (InvocationTargetException e)
- {
- Throwable t = e.getCause();
- return String.format
- ("error parsing %s: %s: %s", option, t.getClass().getName(),
- t.getMessage());
- }
- catch (IllegalAccessException e)
- {
- throw new RuntimeException
- ("unable to access parse method: " + option, e);
- }
- }
-
- return parseArguments(arguments);
- }
-
- public String parseArguments(List<String> arguments)
- {
- if (arguments.size() > 0)
- {
- String args = arguments.toString();
- return "unrecognized arguments: " + args.substring(1, args.length() - 1);
- }
- else
- {
- return null;
- }
- }
-
- public String toString()
- {
- Class klass = getClass();
- Field[] fields = klass.getFields();
- StringBuilder str = new StringBuilder();
- for (int i = 0; i < fields.length; i++)
- {
- if (i > 0)
- {
- str.append("\n");
- }
-
- String name = fields[i].getName();
- str.append(name);
- str.append(" = ");
- Object value;
- try
- {
- value = fields[i].get(this);
- }
- catch (IllegalAccessException e)
- {
- throw new RuntimeException
- ("unable to access field: " + name, e);
- }
- str.append(value);
- }
-
- return str.toString();
- }
- }
-
- public static final void main(String[] args) throws Exception
- {
- final Options opts = new Options();
- String error = opts.parse(args);
- if (error != null)
- {
- System.err.println(error);
- System.exit(-1);
- return;
- }
-
- if (opts.help)
- {
- System.out.println(opts.usage);
- return;
- }
-
- System.out.println(opts);
-
- switch (opts.mode)
- {
- case CONSUME:
- case BOTH:
- Runnable r = new Runnable()
- {
- public void run()
- {
- try
- {
- if (opts.jms_consume)
- {
- jms_consumer(opts);
- }
- else
- {
- native_consumer(opts);
- }
- }
- catch (Exception e)
- {
- throw new RuntimeException(e);
- }
- System.out.println("Consumer Completed");
- }
- };
-
- Thread t;
- try
- {
- t = Threading.getThreadFactory().createThread(r);
- }
- catch(Exception e)
- {
- throw new Error("Error creating consumer thread",e);
- }
- t.start();
- break;
- }
-
- switch (opts.mode)
- {
- case PUBLISH:
- case BOTH:
- Runnable r = new Runnable()
- {
- public void run()
- {
- try
- {
- if (opts.jms_publish)
- {
- jms_publisher(opts);
- }
- else
- {
- native_publisher(opts);
- }
- }
- catch (Exception e)
- {
- throw new RuntimeException(e);
- }
- System.out.println("Producer Completed");
- }
- };
- Thread t;
- try
- {
- t = Threading.getThreadFactory().createThread(r);
- }
- catch(Exception e)
- {
- throw new Error("Error creating publisher thread",e);
- }
- t.start();
- break;
- }
- }
-
- private static enum Column
- {
- LEFT, RIGHT
- }
-
- private static final void sample(Options opts, Column col, String name, long count,
- long start, long time, long lastTime)
- {
- String pfx = "";
- String sfx = "";
- if (opts.mode == BOTH)
- {
- if (col == Column.RIGHT)
- {
- pfx = " -- ";
- }
- else
- {
- sfx = " --";
- }
- }
-
- if (count == 0)
- {
- String stats = String.format("%s: %tc", name, start);
- System.out.println(String.format("%s%-36s%s", pfx, stats, sfx));
- return;
- }
-
- double cumulative = 1000 * (double) count / (double) (time - start);
- double interval = 1000 * ((double) opts.sample / (double) (time - lastTime));
-
- String stats = String.format
- ("%s: %d %.2f %.2f", name, count, cumulative, interval);
- System.out.println(String.format("%s%-36s%s", pfx, stats, sfx));
- }
-
- private static final javax.jms.Connection getJMSConnection(Options opts) throws Exception
- {
- String url = String.format
- ("amqp://guest:guest@clientid/test?brokerlist='tcp://%s:%d'",
- opts.broker, opts.port);
- return new AMQConnection(url);
- }
-
- private static final void jms_publisher(Options opts) throws Exception
- {
- javax.jms.Connection conn = getJMSConnection(opts);
-
- javax.jms.Session ssn = conn.createSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE);
- Destination dest = ssn.createQueue("test-queue");
- Destination echo_dest = ssn.createQueue("echo-queue");
- MessageProducer prod = ssn.createProducer(dest);
- MessageConsumer cons = ssn.createConsumer(echo_dest);
- prod.setDisableMessageID(!opts.message_id);
- prod.setDisableMessageTimestamp(!opts.timestamp);
- prod.setDeliveryMode(opts.persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
-
- StringBuilder str = new StringBuilder();
- for (int i = 0; i < opts.size; i++)
- {
- str.append((char) (i % 128));
- }
-
- String body = str.toString();
-
- TextMessage cached = ssn.createTextMessage();
- cached.setText(body);
-
- conn.start();
-
- long count = 0;
- long lastTime = 0;
- long start = System.currentTimeMillis();
- while (opts.count == 0 || count < opts.count)
- {
- if (opts.window > 0 && (count % opts.window) == 0 && count > 0)
- {
- Message echo = cons.receive();
- }
-
- if (opts.sample > 0 && (count % opts.sample) == 0)
- {
- long time = System.currentTimeMillis();
- sample(opts, Column.LEFT, "JP", count, start, time, lastTime);
- lastTime = time;
- }
-
- TextMessage m;
- if (opts.message_cache)
- {
- m = cached;
- }
- else
- {
- m = ssn.createTextMessage();
- m.setText(body);
- }
-
- prod.send(m);
- count++;
- }
-
- conn.close();
- }
-
- private static final void jms_consumer(final Options opts) throws Exception
- {
- final javax.jms.Connection conn = getJMSConnection(opts);
- javax.jms.Session ssn = conn.createSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE);
- Destination dest = ssn.createQueue("test-queue");
- Destination echo_dest = ssn.createQueue("echo-queue");
- MessageConsumer cons = ssn.createConsumer(dest);
- final MessageProducer prod = ssn.createProducer(echo_dest);
- prod.setDisableMessageID(true);
- prod.setDisableMessageTimestamp(true);
- prod.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
- final TextMessage echo = ssn.createTextMessage();
- echo.setText("ECHO");
-
- final Object done = new Object();
- cons.setMessageListener(new MessageListener()
- {
- private long count = 0;
- private long lastTime = 0;
- private long start;
-
- public void onMessage(Message m)
- {
- if (count == 0)
- {
- start = System.currentTimeMillis();
- }
-
- try
- {
- boolean sample = opts.sample > 0 && (count % opts.sample) == 0;
- long time = sample ? System.currentTimeMillis() : 0;
-
- if (opts.window > 0 && (count % opts.window) == 0)
- {
- prod.send(echo);
- }
-
- if (sample)
- {
- sample(opts, Column.RIGHT, "JC", count, start, time, lastTime);
- lastTime = time;
- }
- }
- catch (JMSException e)
- {
- throw new RuntimeException(e);
- }
- count++;
-
- if (opts.count > 0 && count >= opts.count)
- {
- synchronized (done)
- {
- done.notify();
- }
- }
- }
- });
-
- conn.start();
- synchronized (done)
- {
- done.wait();
- }
- conn.close();
- }
-
- private static final org.apache.qpid.transport.Connection getConnection
- (Options opts)
- {
- org.apache.qpid.transport.Connection conn =
- new org.apache.qpid.transport.Connection();
- conn.connect(opts.broker, opts.port, null, "guest", "guest", false, null);
- return conn;
- }
-
- private static abstract class NativeListener implements SessionListener
- {
-
- public void opened(org.apache.qpid.transport.Session ssn) {}
-
- public void resumed(org.apache.qpid.transport.Session ssn) {}
-
- public void exception(org.apache.qpid.transport.Session ssn,
- SessionException exc)
- {
- exc.printStackTrace();
- }
-
- public void closed(org.apache.qpid.transport.Session ssn) {}
-
- }
-
- private static final void native_publisher(Options opts) throws Exception
- {
- final long[] echos = { 0 };
- org.apache.qpid.transport.Connection conn = getConnection(opts);
- org.apache.qpid.transport.Session ssn = conn.createSession();
- ssn.setSessionListener(new NativeListener()
- {
- public void message(org.apache.qpid.transport.Session ssn,
- MessageTransfer xfr)
- {
- synchronized (echos)
- {
- echos[0]++;
- echos.notify();
- }
- ssn.processed(xfr);
- }
- });
-
- ssn.invoke(new QueueDeclare().queue("test-queue").durable(false));
- ssn.invoke(new QueueDeclare().queue("echo-queue").durable(false));
- ssn.invoke(new ExchangeBind().exchange("amq.direct").queue("test-queue").bindingKey("test-queue"));
- ssn.invoke(new ExchangeBind().exchange("amq.direct").queue("echo-queue").bindingKey("echo-queue"));
-
- MessageProperties cached_mp = new MessageProperties();
- DeliveryProperties cached_dp = new DeliveryProperties();
- cached_dp.setRoutingKey("test-queue");
- cached_dp.setDeliveryMode
- (opts.persistent ? MessageDeliveryMode.PERSISTENT : MessageDeliveryMode.NON_PERSISTENT);
-
- int size = opts.size;
- ByteBuffer body = ByteBuffer.allocate(size);
- for (int i = 0; i < size; i++)
- {
- body.put((byte) i);
- }
- body.flip();
-
- ssn.invoke(new MessageSubscribe()
- .queue("echo-queue")
- .destination("echo-queue")
- .acceptMode(MessageAcceptMode.NONE)
- .acquireMode(MessageAcquireMode.PRE_ACQUIRED));
- ssn.messageSetFlowMode("echo-queue", MessageFlowMode.WINDOW);
- ssn.messageFlow("echo-queue", MessageCreditUnit.MESSAGE, 0xFFFFFFFF);
- ssn.messageFlow("echo-queue", MessageCreditUnit.BYTE, 0xFFFFFFFF);
-
- UUIDGen gen = UUIDs.newGenerator();
-
- long count = 0;
- long lastTime = 0;
- long start = System.currentTimeMillis();
- while (opts.count == 0 || count < opts.count)
- {
- if (opts.window > 0 && (count % opts.window) == 0 && count > 0)
- {
- synchronized (echos)
- {
- while (echos[0] < (count/opts.window))
- {
- echos.wait();
- }
- }
- }
-
- if (opts.sample > 0 && (count % opts.sample) == 0)
- {
- long time = System.currentTimeMillis();
- sample(opts, Column.LEFT, "NP", count, start, time, lastTime);
- lastTime = time;
- }
-
- MessageProperties mp;
- DeliveryProperties dp;
- if (opts.message_cache)
- {
- mp = cached_mp;
- dp = cached_dp;
- }
- else
- {
- mp = new MessageProperties();
- dp = new DeliveryProperties();
- dp.setRoutingKey("test-queue");
- dp.setDeliveryMode
- (opts.persistent ? MessageDeliveryMode.PERSISTENT : MessageDeliveryMode.NON_PERSISTENT);
-
- }
-
- if (opts.message_id)
- {
- mp.setMessageId(gen.generate());
- }
-
- if (opts.timestamp)
- {
- dp.setTimestamp(System.currentTimeMillis());
- }
-
- ssn.messageTransfer("amq.direct", MessageAcceptMode.NONE, MessageAcquireMode.PRE_ACQUIRED,
- new Header(dp, mp), body.slice());
- count++;
- }
-
- ssn.messageCancel("echo-queue");
-
- ssn.sync();
- ssn.close();
- conn.close();
- }
-
- private static final void native_consumer(final Options opts) throws Exception
- {
- final DeliveryProperties dp = new DeliveryProperties();
- final byte[] echo = new byte[0];
- dp.setRoutingKey("echo-queue");
- dp.setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT);
- final MessageProperties mp = new MessageProperties();
- final Object done = new Object();
- org.apache.qpid.transport.Connection conn = getConnection(opts);
- org.apache.qpid.transport.Session ssn = conn.createSession();
- ssn.setSessionListener(new NativeListener()
- {
- private long count = 0;
- private long lastTime = 0;
- private long start;
-
- public void message(org.apache.qpid.transport.Session ssn,
- MessageTransfer xfr)
- {
- if (count == 0)
- {
- start = System.currentTimeMillis();
- }
-
- boolean sample = opts.sample > 0 && (count % opts.sample) == 0;
- long time = sample ? System.currentTimeMillis() : 0;
-
- if (opts.window > 0 && (count % opts.window) == 0)
- {
- ssn.messageTransfer("amq.direct",
- MessageAcceptMode.NONE,
- MessageAcquireMode.PRE_ACQUIRED,
- new Header(dp, mp),
- echo);
- }
-
- if (sample)
- {
- sample(opts, Column.RIGHT, "NC", count, start, time, lastTime);
- lastTime = time;
- }
- ssn.processed(xfr);
- count++;
-
- if (opts.count > 0 && count >= opts.count)
- {
- synchronized (done)
- {
- done.notify();
- }
- }
- }
- });
-
- ssn.invoke(new QueueDeclare().queue("test-queue").durable(false));
- ssn.invoke(new QueueDeclare().queue("echo-queue").durable(false));
- ssn.invoke(new ExchangeBind().exchange("amq.direct").queue("test-queue").bindingKey("test-queue"));
- ssn.invoke(new ExchangeBind().exchange("amq.direct").queue("echo-queue").bindingKey("echo-queue"));
-
- ssn.invoke(new MessageSubscribe()
- .queue("test-queue")
- .destination("test-queue")
- .acceptMode(MessageAcceptMode.NONE)
- .acquireMode(MessageAcquireMode.PRE_ACQUIRED));
- ssn.messageSetFlowMode("test-queue", MessageFlowMode.WINDOW);
- ssn.messageFlow("test-queue", MessageCreditUnit.MESSAGE, 0xFFFFFFFF);
- ssn.messageFlow("test-queue", MessageCreditUnit.BYTE, 0xFFFFFFFF);
-
- synchronized (done)
- {
- done.wait();
- }
-
- ssn.messageCancel("test-queue");
-
- ssn.sync();
- ssn.close();
- conn.close();
- }
-
-}
diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidReceive.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidReceive.java
deleted file mode 100644
index 4092f0d59d..0000000000
--- a/qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidReceive.java
+++ /dev/null
@@ -1,205 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.tools;
-
-import java.util.UUID;
-import java.util.concurrent.CountDownLatch;
-
-import javax.jms.Connection;
-import javax.jms.Destination;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-
-import org.apache.qpid.client.AMQDestination;
-import org.apache.qpid.client.message.AbstractJMSMessage;
-import org.apache.qpid.tools.report.BasicReporter;
-import org.apache.qpid.tools.report.Reporter;
-import org.apache.qpid.tools.report.Statistics.ThroughputAndLatency;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class QpidReceive implements MessageListener
-{
- private static final Logger _logger = LoggerFactory.getLogger(QpidReceive.class);
- private final CountDownLatch testCompleted = new CountDownLatch(1);
-
- private Connection con;
- private Session session;
- private Destination dest;
- private MessageConsumer consumer;
- private boolean transacted = false;
- private boolean isRollback = false;
- private int txSize = 0;
- private int rollbackFrequency = 0;
- private int ackFrequency = 0;
- private int expected = 0;
- private int received = 0;
- private Reporter report;
- private TestConfiguration config;
-
- public QpidReceive(Reporter report, TestConfiguration config, Connection con, Destination dest)
- {
- this(report,config, con, dest, UUID.randomUUID().toString());
- }
-
- public QpidReceive(Reporter report, TestConfiguration config, Connection con, Destination dest, String prefix)
- {
- //System.out.println("Producer ID : " + id);
- this.report = report;
- this.config = config;
- this.con = con;
- this.dest = dest;
- }
-
- public void setUp() throws Exception
- {
- con.start();
- if (config.isTransacted())
- {
- session = con.createSession(true, Session.SESSION_TRANSACTED);
- }
- else if (config.getAckFrequency() > 0)
- {
- session = con.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
- }
- else
- {
- session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
- }
- consumer = session.createConsumer(dest);
- consumer.setMessageListener(this);
- if (_logger.isDebugEnabled())
- {
- System.out.println("Consumer: " + /*id +*/ " Receiving messages from : " + ((AMQDestination)dest).getAddressName() + "\n");
- }
-
- transacted = config.isTransacted();
- txSize = config.getTransactionSize();
- isRollback = config.getRollbackFrequency() > 0;
- rollbackFrequency = config.getRollbackFrequency();
- ackFrequency = config.getAckFrequency();
-
- _logger.debug("Ready address : " + config.getReadyAddress());
- if (config.getReadyAddress() != null)
- {
- MessageProducer prod = session.createProducer(AMQDestination
- .createDestination(config.getReadyAddress(), false));
- prod.send(session.createMessage());
- if (_logger.isDebugEnabled())
- {
- _logger.debug("Sending message to ready address " + prod.getDestination());
- }
- }
- }
-
- public void resetCounters()
- {
- received = 0;
- expected = 0;
- report.clear();
- }
-
- public void onMessage(Message msg)
- {
- try
- {
- if (msg instanceof TextMessage &&
- TestConfiguration.EOS.equals(((TextMessage)msg).getText()))
- {
- testCompleted.countDown();
- return;
- }
-
- received++;
- report.message(msg);
-
- if (config.isPrintHeaders())
- {
- System.out.println(((AbstractJMSMessage)msg).toHeaderString());
- }
-
- if (config.isPrintContent())
- {
- System.out.println(((AbstractJMSMessage)msg).toBodyString());
- }
-
- if (transacted && (received % txSize == 0))
- {
- if (isRollback && (received % rollbackFrequency == 0))
- {
- session.rollback();
- }
- else
- {
- session.commit();
- }
- }
- else if (ackFrequency > 0)
- {
- msg.acknowledge();
- }
-
- if (received >= expected)
- {
- testCompleted.countDown();
- }
-
- }
- catch(Exception e)
- {
- _logger.error("Error when receiving messages",e);
- }
- }
-
- public void waitforCompletion(int expected) throws Exception
- {
- this.expected = expected;
- testCompleted.await();
- }
-
- public void tearDown() throws Exception
- {
- session.close();
- }
-
- public static void main(String[] args) throws Exception
- {
- TestConfiguration config = new JVMArgConfiguration();
- Reporter reporter = new BasicReporter(ThroughputAndLatency.class,
- System.out,
- config.reportEvery(),
- config.isReportHeader());
- Destination dest = AMQDestination.createDestination(config.getAddress(), false);
- QpidReceive receiver = new QpidReceive(reporter,config, config.createConnection(),dest);
- receiver.setUp();
- receiver.waitforCompletion(config.getMsgCount() + config.getSendEOS());
- if (config.isReportTotal())
- {
- reporter.report();
- }
- receiver.tearDown();
- }
-
-}
diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidSend.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidSend.java
deleted file mode 100644
index 58a643726c..0000000000
--- a/qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidSend.java
+++ /dev/null
@@ -1,303 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.tools;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Random;
-import java.util.UUID;
-
-import javax.jms.BytesMessage;
-import javax.jms.Connection;
-import javax.jms.DeliveryMode;
-import javax.jms.Destination;
-import javax.jms.Message;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-
-import org.apache.qpid.client.AMQDestination;
-import org.apache.qpid.tools.TestConfiguration.MessageType;
-import org.apache.qpid.tools.report.BasicReporter;
-import org.apache.qpid.tools.report.Reporter;
-import org.apache.qpid.tools.report.Statistics.Throughput;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class QpidSend
-{
- private Connection con;
- private Session session;
- private Destination dest;
- private MessageProducer producer;
- private MessageType msgType;
- private Message msg;
- private Object payload;
- private List<Object> payloads;
- private boolean cacheMsg = false;
- private boolean randomMsgSize = false;
- private boolean durable = false;
- private Random random;
- private int msgSizeRange = 1024;
- private int totalMsgCount = 0;
- private boolean rateLimitProducer = false;
- private boolean transacted = false;
- private int txSize = 0;
-
- private static final Logger _logger = LoggerFactory.getLogger(QpidSend.class);
- Reporter report;
- TestConfiguration config;
-
- public QpidSend(Reporter report, TestConfiguration config, Connection con, Destination dest)
- {
- this(report,config, con, dest, UUID.randomUUID().toString());
- }
-
- public QpidSend(Reporter report, TestConfiguration config, Connection con, Destination dest, String prefix)
- {
- //System.out.println("Producer ID : " + id);
- this.report = report;
- this.config = config;
- this.con = con;
- this.dest = dest;
- }
-
- public void setUp() throws Exception
- {
- con.start();
- if (config.isTransacted())
- {
- session = con.createSession(true, Session.SESSION_TRANSACTED);
- }
- else
- {
- session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
- }
-
- durable = config.isDurable();
- rateLimitProducer = config.getSendRate() > 0 ? true : false;
- if (_logger.isDebugEnabled() && rateLimitProducer)
- {
- _logger.debug("The test will attempt to limit the producer to " + config.getSendRate() + " msg/sec");
- }
-
- transacted = config.isTransacted();
- txSize = config.getTransactionSize();
-
- msgType = MessageType.getType(config.getMessageType());
- // if message caching is enabled we pre create the message
- // else we pre create the payload
- if (config.isCacheMessage())
- {
- cacheMsg = true;
- msg = createMessage(createPayload(config.getMsgSize()));
- msg.setJMSDeliveryMode(durable?
- DeliveryMode.PERSISTENT :
- DeliveryMode.NON_PERSISTENT
- );
- }
- else if (config.isRandomMsgSize())
- {
- random = new Random(20080921);
- randomMsgSize = true;
- msgSizeRange = config.getMsgSize();
- payloads = new ArrayList<Object>(msgSizeRange);
-
- for (int i=0; i < msgSizeRange; i++)
- {
- payloads.add(createPayload(i));
- }
- }
- else
- {
- payload = createPayload(config.getMsgSize());
- }
-
- producer = session.createProducer(dest);
- if (_logger.isDebugEnabled())
- {
- _logger.debug("Producer: " + /*id +*/ " Sending messages to: " + ((AMQDestination)dest).getAddressName());
- }
- producer.setDisableMessageID(config.isDisableMessageID());
- //we add a separate timestamp to allow interoperability with other clients.
- producer.setDisableMessageTimestamp(true);
- if (config.getTTL() > 0)
- {
- producer.setTimeToLive(config.getTTL());
- }
- if (config.getPriority() > 0)
- {
- producer.setPriority(config.getPriority());
- }
- }
-
- Object createPayload(int size)
- {
- if (msgType == MessageType.TEXT)
- {
- return MessageFactory.createMessagePayload(size);
- }
- else
- {
- return MessageFactory.createMessagePayload(size).getBytes();
- }
- }
-
- Message createMessage(Object payload) throws Exception
- {
- if (msgType == MessageType.TEXT)
- {
- return session.createTextMessage((String)payload);
- }
- else
- {
- BytesMessage m = session.createBytesMessage();
- m.writeBytes((byte[])payload);
- return m;
- }
- }
-
- protected Message getNextMessage() throws Exception
- {
- if (cacheMsg)
- {
- return msg;
- }
- else
- {
- Message m;
-
- if (!randomMsgSize)
- {
- m = createMessage(payload);
- }
- else
- {
- m = createMessage(payloads.get(random.nextInt(msgSizeRange)));
- }
- m.setJMSDeliveryMode(durable?
- DeliveryMode.PERSISTENT :
- DeliveryMode.NON_PERSISTENT
- );
- return m;
- }
- }
-
- public void commit() throws Exception
- {
- session.commit();
- }
-
- public void send() throws Exception
- {
- send(config.getMsgCount());
- }
-
- public void send(int count) throws Exception
- {
- int sendRate = config.getSendRate();
- if (rateLimitProducer)
- {
- int iterations = count/sendRate;
- int remainder = count%sendRate;
- for (int i=0; i < iterations; i++)
- {
- long iterationStart = System.currentTimeMillis();
- sendMessages(sendRate);
- long elapsed = System.currentTimeMillis() - iterationStart;
- long diff = Clock.SEC - elapsed;
- if (diff > 0)
- {
- // We have sent more messages in a sec than specified by the rate.
- Thread.sleep(diff);
- }
- }
- sendMessages(remainder);
- }
- else
- {
- sendMessages(count);
- }
- }
-
- private void sendMessages(int count) throws Exception
- {
- boolean isTimestamp = !config.isDisableTimestamp();
- long s = System.currentTimeMillis();
- for(int i=0; i < count; i++ )
- {
- Message msg = getNextMessage();
- if (isTimestamp)
- {
- msg.setLongProperty(TestConfiguration.TIMESTAMP, System.currentTimeMillis());
- }
- producer.send(msg);
- //report.message(msg);
- totalMsgCount++;
-
- if ( transacted && ((totalMsgCount) % txSize == 0))
- {
- session.commit();
- }
- }
- long e = System.currentTimeMillis() - s;
- //System.out.println("Rate : " + totalMsgCount/e);
- }
-
- public void resetCounters()
- {
- totalMsgCount = 0;
- report.clear();
- }
-
- public void sendEndMessage() throws Exception
- {
- Message msg = session.createTextMessage(TestConfiguration.EOS);
- producer.send(msg);
- }
-
- public void tearDown() throws Exception
- {
- session.close();
- }
-
- public static void main(String[] args) throws Exception
- {
- TestConfiguration config = new JVMArgConfiguration();
- Reporter reporter = new BasicReporter(Throughput.class,
- System.out,
- config.reportEvery(),
- config.isReportHeader()
- );
- Destination dest = AMQDestination.createDestination(config.getAddress(), false);
- QpidSend sender = new QpidSend(reporter,config, config.createConnection(),dest);
- sender.setUp();
- sender.send();
- if (config.getSendEOS() > 0)
- {
- sender.sendEndMessage();
- }
- if (config.isReportTotal())
- {
- reporter.report();
- }
- sender.tearDown();
- }
-}
diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/RestStressTestClient.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/RestStressTestClient.java
deleted file mode 100644
index 790ed80e5f..0000000000
--- a/qpid/java/tools/src/main/java/org/apache/qpid/tools/RestStressTestClient.java
+++ /dev/null
@@ -1,667 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.tools;
-
-import javax.crypto.Mac;
-import javax.crypto.spec.SecretKeySpec;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.net.HttpURLConnection;
-import java.net.URL;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.commons.codec.binary.Base64;
-import org.apache.qpid.tools.util.ArgumentsParser;
-import org.codehaus.jackson.map.ObjectMapper;
-import org.codehaus.jackson.type.TypeReference;
-
-public class RestStressTestClient
-{
-
- public static void main(String[] args) throws Exception
- {
- ArgumentsParser parser = new ArgumentsParser();
- Arguments arguments;
- try
- {
- arguments = parser.parse(args, Arguments.class);
- arguments.validate();
- }
- catch(IllegalArgumentException e)
- {
- System.out.println("Invalid argument:" + e.getMessage());
- parser.usage(Arguments.class, Arguments.REQUIRED);
- System.out.println("\nRun examples:" );
- System.out.println(" Using Basic authentication:" );
- System.out.println(" java -cp qpid-tools.jar:commons-codec.jar:jackson-core.jar:jackson-mapper.jar \\" );
- System.out.println(" -Djavax.net.ssl.trustStore=java_client_truststore.jks \\");
- System.out.println(" -Djavax.net.ssl.trustStorePassword=password \\");
- System.out.println(" org.apache.qpid.tools.RestStressTestClient \\");
- System.out.println(" repetitions=10 brokerUrl=https://localhost:8081 username=admin password=admin \\");
- System.out.println(" virtualHost=default virtualHostNode=default createQueue=true bindQueue=true \\");
- System.out.println(" deleteQueue=true uniqueQueues=true queueName=boo exchangeName=amq.fanout" );
- System.out.println(" Using CRAM-MD5 SASL authentication:" );
- System.out.println(" java -cp qpid-tools.jar:commons-codec.jar:jackson-core.jar:jackson-mapper.jar \\" );
- System.out.println(" org.apache.qpid.tools.RestStressTestClient saslMechanism=CRAM-MD5 \\");
- System.out.println(" repetitions=10 brokerUrl=http://localhost:8080 username=admin password=admin \\");
- System.out.println(" virtualHost=default virtualHostNode=default createQueue=true bindQueue=true \\");
- System.out.println(" deleteQueue=true uniqueQueues=true queueName=boo exchangeName=amq.fanout" );
- return;
- }
-
- RestStressTestClient client = new RestStressTestClient();
- client.run(arguments);
- }
-
- public void run(Arguments arguments) throws IOException
- {
- log(arguments.toString());
- for (int i = 0; i < arguments.getRepetitions(); i++)
- {
- runIteration(arguments, i);
- }
- }
-
- private void runIteration(Arguments arguments, int iteration) throws IOException
- {
- log("Iteration " + iteration);
-
- RestClient client = new RestClient(arguments.getBrokerUrl(), arguments.getUsername(), arguments.getPassword(), arguments.getSaslMechanism());
- client.authenticateIfSaslAuthenticationRequested();
- try
- {
- List<Map<String, Object>> brokerData = client.get("/api/latest/broker?depth=0");
- log(" Connected to broker " + brokerData.get(0).get("name"));
- createAndBindQueueIfRequired(arguments, client, iteration);
- }
- finally
- {
- if (arguments.isLogout())
- {
- client.logout();
- }
- }
- }
-
- private void log(String logMessage)
- {
- System.out.println(logMessage);
- }
-
- private void createAndBindQueueIfRequired(Arguments arguments, RestClient client, int iteration) throws IOException
- {
- if (arguments.isCreateQueue())
- {
- String virtualHostNode = arguments.getVirtualHostNode();
- String virtualHost = arguments.getVirtualHost();
- String queueName = arguments.getQueueName();
-
- if (queueName == null)
- {
- queueName = "temp-queue-" + System.nanoTime();
- }
- else if (arguments.isUniqueQueues())
- {
- queueName = queueName + "-" + iteration;
- }
-
- createQueue(client, virtualHostNode, virtualHost, queueName);
-
- if (arguments.isBindQueue())
- {
- bindQueue(client, virtualHostNode, virtualHost, queueName, arguments.getExchangeName());
- }
-
- if (arguments.isDeleteQueue())
- {
- deleteQueue(client, virtualHostNode, virtualHost, queueName);
- }
- }
- }
-
- private void createQueue(RestClient client, String virtualHostNode, String virtualHost, String queueName) throws IOException
- {
- log(" Create queue " + queueName);
-
- String queueUrl = getQueueServiceUrl(virtualHostNode, virtualHost, queueName);
- Map<String, Object> queueData = new HashMap<>();
- queueData.put("name", queueName);
- queueData.put("durable", true);
-
- int result = client.put(queueUrl, queueData);
-
- if (result != RestClient.RESPONSE_PUT_CREATE_OK)
- {
- throw new RuntimeException("Failure to create queue " + queueName);
- }
- }
-
- private String getQueueServiceUrl(String virtualHostNode, String virtualHost, String queueName)
- {
- return "/api/latest/queue/" + virtualHostNode + "/" + virtualHost + "/" + queueName;
- }
-
- private void deleteQueue(RestClient client, String virtualHostNode, String virtualHost, String queueName) throws IOException
- {
- log(" Delete queue " + queueName);
- int result = client.delete(getQueueServiceUrl(virtualHostNode, virtualHost, queueName));
- if (result != RestClient.RESPONSE_PUT_UPDATE_OK)
- {
- throw new RuntimeException("Failure to delete queue " + queueName);
- }
- }
-
- private void bindQueue(RestClient client, String virtualHostNode, String virtualHost, String queueName, String exchangeName)
- throws IOException
- {
- if (exchangeName == null)
- {
- exchangeName = "amq.direct";
- }
-
- log(" Bind queue " + queueName + " to " + exchangeName + " using binding key " + queueName);
-
- String bindingUrl = "/api/latest/binding/" + virtualHostNode + "/" + virtualHost + "/" + exchangeName + "/" + queueName + "/" + queueName;
-
- Map<String, Object> bindingData = new HashMap<>();
- bindingData.put("name", queueName);
- bindingData.put("queue", queueName);
- bindingData.put("exchange", exchangeName);
-
- int result = client.put(bindingUrl, bindingData);
-
- if (result != RestClient.RESPONSE_PUT_CREATE_OK)
- {
- throw new RuntimeException("Failure to bind queue " + queueName + " to " + exchangeName);
- }
- }
-
- public static class RestClient
- {
- private static final TypeReference<List<LinkedHashMap<String, Object>>> TYPE_LIST_OF_LINKED_HASH_MAPS = new TypeReference<List<LinkedHashMap<String, Object>>>()
- {
- };
-
- private static final TypeReference<LinkedHashMap<String, Object>> TYPE_HASH_MAP = new TypeReference<LinkedHashMap<String, Object>>()
- {
- };
-
- public static final int RESPONSE_PUT_CREATE_OK = 201;
- public static final int RESPONSE_PUT_UPDATE_OK = 200;
- public static final int RESPONSE_OK = 200;
- public static final int RESPONSE_AUTHENTICATION_REQUIRED = 401;
-
- private final ObjectMapper _mapper;
- private final String _brokerUrl;
- private final String _username;
- private final String _password;
- private final String _saslMechanism;
- private final String _authorizationHeader;
-
- private List<String> _cookies;
-
- public RestClient(String brokerUrl, String username, String password, String saslMechanism)
- {
- _mapper = new ObjectMapper();
- _brokerUrl = brokerUrl;
- _username = username;
- _password = password;
- _saslMechanism = saslMechanism;
-
- if (saslMechanism == null)
- {
- _authorizationHeader = "Basic " + new String(new Base64().encode((_username + ":" + _password).getBytes()));
- }
- else
- {
- _authorizationHeader = null;
- }
- }
-
- public List<Map<String, Object>> get(String restServiceUrl) throws IOException
- {
- HttpURLConnection connection = createConnection("GET", restServiceUrl, _cookies);
- try
- {
- connection.connect();
- byte[] data = readConnectionInputStream(connection);
- checkResponseCode(connection);
- return _mapper.readValue(new ByteArrayInputStream(data), TYPE_LIST_OF_LINKED_HASH_MAPS);
- }
- finally
- {
- connection.disconnect();
- }
- }
-
- public int put(String restServiceUrl, Map<String, Object> attributes) throws IOException
- {
- HttpURLConnection connection = createConnection("PUT", restServiceUrl, _cookies);
- try
- {
- connection.connect();
- if (attributes != null)
- {
- ObjectMapper mapper = new ObjectMapper();
- mapper.writeValue(connection.getOutputStream(), attributes);
- }
- checkResponseCode(connection);
- return connection.getResponseCode();
- }
- finally
- {
- connection.disconnect();
- }
- }
-
- public int delete(String restServiceUrl) throws IOException
- {
- HttpURLConnection connection = createConnection("DELETE", restServiceUrl, _cookies);
- try
- {
- checkResponseCode(connection);
- return connection.getResponseCode();
- }
- finally
- {
- connection.disconnect();
- }
- }
-
- public int post(String restServiceUrl, Map<String, String> postData) throws IOException
- {
- HttpURLConnection connection = createConnectionAndPostData(restServiceUrl, postData, _cookies);
- try
- {
- checkResponseCode(connection);
- return connection.getResponseCode();
- }
- finally
- {
- connection.disconnect();
- }
- }
-
- private HttpURLConnection createConnectionAndPostData(String restServiceUrl, Map<String, String> postData, List<String> cookies) throws IOException
- {
- String postParameters = getPostDataString(postData);
- HttpURLConnection connection = createConnection("POST", restServiceUrl, cookies);
- try
- {
- OutputStream os = connection.getOutputStream();
- os.write(postParameters.getBytes());
- os.flush();
- }
- catch (IOException e)
- {
- connection.disconnect();
- throw e;
- }
- return connection;
- }
-
- private void checkResponseCode(HttpURLConnection connection) throws IOException
- {
- if (connection.getResponseCode() == RESPONSE_AUTHENTICATION_REQUIRED)
- {
- _cookies = null;
- throw new IllegalArgumentException("Authentication is required");
- }
- }
-
- private String getPostDataString(Map<String, String> postData)
- {
- StringBuilder sb = new StringBuilder();
- if (postData != null)
- {
- Iterator<String> iterator = postData.keySet().iterator();
- while (iterator.hasNext())
- {
- String key = iterator.next();
- sb.append(key + "=" + postData.get(key));
- if (iterator.hasNext())
- {
- sb.append("&");
- }
- }
- }
- return sb.toString();
- }
-
- private HttpURLConnection createConnection(String method, String restServiceUrl, List<String> cookies) throws IOException
- {
- HttpURLConnection httpConnection = (HttpURLConnection) new URL(_brokerUrl + restServiceUrl).openConnection();
- if (cookies != null)
- {
- for (String cookie : cookies)
- {
- httpConnection.addRequestProperty("Cookie", cookie.split(";", 2)[0]);
- }
- }
- if (_saslMechanism == null)
- {
- httpConnection.setRequestProperty("Authorization", _authorizationHeader);
- }
-
- httpConnection.setDoOutput(true);
- httpConnection.setRequestMethod(method);
- return httpConnection;
- }
-
- public void authenticateIfSaslAuthenticationRequested() throws IOException
- {
- if (_saslMechanism == null)
- {
- // basic authentication will be used with each request
- }
- else if ("CRAM-MD5".equals(_saslMechanism))
- {
- _cookies = performCramMD5Authentication();
- }
- else
- {
- throw new IllegalArgumentException("Unsupported SASL mechanism :" + _saslMechanism);
- }
- }
-
-
- public void logout() throws IOException
- {
- if (_cookies != null)
- {
- HttpURLConnection connection = createConnection("GET", "/service/logout", _cookies);
- try
- {
- connection.connect();
- _cookies = null;
- }
- finally
- {
- connection.disconnect();
- }
- }
-
- //TODO: we need to track sessions for basic auth in order to logout those
- }
-
- private List<String> performCramMD5Authentication() throws IOException
- {
- // request the challenge for CRAM-MD5
- HttpURLConnection connection = createConnectionAndPostData("/service/sasl", Collections.singletonMap("mechanism", "CRAM-MD5"), null);
- try
- {
- List<String> cookies = connection.getHeaderFields().get("Set-Cookie");
-
- // get response
- byte[] data = readConnectionInputStream(connection);
- Map<String, Object> response = _mapper.readValue(new ByteArrayInputStream(data), TYPE_HASH_MAP);
- String challenge = (String) response.get("challenge");
-
- // generate the authentication response for the received challenge
- String responseData = generateResponseForChallengeAndCredentials(challenge, _username, _password);
-
- Map<String, String> saslResponse = new HashMap<>();
- saslResponse.put("id", (String)response.get("id"));
- saslResponse.put("response", responseData);
-
- HttpURLConnection authenticateConnection = createConnectionAndPostData("/service/sasl", saslResponse, cookies);
- try
- {
- int code = authenticateConnection.getResponseCode();
- if (code != RESPONSE_OK)
- {
- throw new RuntimeException("Authentication failed");
- }
- else
- {
- return cookies;
- }
- }
- finally
- {
- authenticateConnection.disconnect();
- }
- }
- finally
- {
- connection.disconnect();
- }
- }
-
- private String generateResponseForChallengeAndCredentials(String challenge, String username, String password)
- {
- try
- {
- byte[] challengeBytes = Base64.decodeBase64(challenge);
-
- String macAlgorithm = "HmacMD5";
- Mac mac = Mac.getInstance(macAlgorithm);
- mac.init(new SecretKeySpec(password.getBytes("UTF-8"), macAlgorithm));
- final byte[] messageAuthenticationCode = mac.doFinal(challengeBytes);
- String responseAsString = username + " " + toHex(messageAuthenticationCode);
- byte[] responseBytes = responseAsString.getBytes();
- return Base64.encodeBase64String(responseBytes);
- }
- catch (Exception e)
- {
- throw new IllegalArgumentException("Unexpected exception", e);
- }
- }
-
- private String toHex(byte[] data)
- {
- StringBuffer hash = new StringBuffer();
- for (int i = 0; i < data.length; i++)
- {
- String hex = Integer.toHexString(0xFF & data[i]);
- if (hex.length() == 1)
- {
- hash.append('0');
- }
- hash.append(hex);
- }
- return hash.toString();
- }
-
- private byte[] readConnectionInputStream(HttpURLConnection connection) throws IOException
- {
- if (connection.getResponseCode() == RESPONSE_AUTHENTICATION_REQUIRED)
- {
- _cookies = null;
- }
- InputStream is = connection.getInputStream();
- try(ByteArrayOutputStream baos = new ByteArrayOutputStream())
- {
- byte[] buffer = new byte[1024];
- int len;
- while ((len = is.read(buffer)) != -1)
- {
- baos.write(buffer, 0, len);
- }
- return baos.toByteArray();
- }
- }
-
- }
-
- public static class Arguments
- {
- private static final Set<String> REQUIRED = new HashSet<>(Arrays.asList("brokerUrl", "username", "password"));
-
- private String brokerUrl = null;
- private String username = null;
- private String password = null;
- private String saslMechanism = null;
-
- private String virtualHostNode = null;
- private String virtualHost = null;
- private String queueName = null;
- private String exchangeName = null;
-
- private int repetitions = 1;
-
- private boolean createQueue = false;
- private boolean deleteQueue = false;
- private boolean uniqueQueues = false;
- private boolean bindQueue = false;
-
- private boolean logout = true;
-
- public Arguments()
- {
- }
-
- public void validate()
- {
- if (brokerUrl == null || brokerUrl.equals(""))
- {
- throw new IllegalArgumentException("Mandatory argument 'brokerUrl' is not specified");
- }
-
- if (username == null || username.equals(""))
- {
- throw new IllegalArgumentException("Mandatory argument 'username' is not specified");
- }
-
- if (password == null || password.equals(""))
- {
- throw new IllegalArgumentException("Mandatory argument 'password' is not specified");
- }
-
- if (createQueue)
- {
- if (virtualHostNode == null || virtualHostNode.equals(""))
- {
- throw new IllegalArgumentException("Virtual host node name needs to be specified for queue creation");
- }
-
- if (virtualHost == null || virtualHost.equals(""))
- {
- throw new IllegalArgumentException("Virtual host name needs to be specified for queue creation");
- }
- }
- }
-
- public String getUsername()
- {
- return username;
- }
-
- public String getPassword()
- {
- return password;
- }
-
- public String getVirtualHost()
- {
- return virtualHost;
- }
-
- public boolean isCreateQueue()
- {
- return createQueue;
- }
-
- public boolean isDeleteQueue()
- {
- return deleteQueue;
- }
-
- public boolean isUniqueQueues()
- {
- return uniqueQueues;
- }
-
- public String getQueueName()
- {
- return queueName;
- }
-
- public boolean isBindQueue()
- {
- return bindQueue;
- }
-
- public String getExchangeName()
- {
- return exchangeName;
- }
-
- public String getVirtualHostNode()
- {
- return virtualHostNode;
- }
-
-
- public int getRepetitions()
- {
- return repetitions;
- }
-
- public String getBrokerUrl()
- {
- return brokerUrl;
- }
-
- public String getSaslMechanism()
- {
- return saslMechanism;
- }
-
- public boolean isLogout()
- {
- return logout;
- }
-
- @Override
- public String toString()
- {
- return "Arguments{" +
- "brokerUrl='" + brokerUrl + '\'' +
- ", username='" + username + '\'' +
- ", password='" + password + '\'' +
- ", saslMechanism='" + saslMechanism + '\'' +
- ", virtualHostNode='" + virtualHostNode + '\'' +
- ", virtualHost='" + virtualHost + '\'' +
- ", queueName='" + queueName + '\'' +
- ", exchangeName='" + exchangeName + '\'' +
- ", repetitions=" + repetitions +
- ", createQueue=" + createQueue +
- ", deleteQueue=" + deleteQueue +
- ", uniqueQueues=" + uniqueQueues +
- ", bindQueue=" + bindQueue +
- ", logout=" + logout +
- '}';
- }
- }
-
-}
diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/StressTestClient.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/StressTestClient.java
deleted file mode 100644
index 6494a2e800..0000000000
--- a/qpid/java/tools/src/main/java/org/apache/qpid/tools/StressTestClient.java
+++ /dev/null
@@ -1,446 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.tools;
-
-import java.io.InputStream;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Random;
-
-import javax.jms.BytesMessage;
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.DeliveryMode;
-import javax.jms.Destination;
-import javax.jms.ExceptionListener;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.naming.Context;
-import javax.naming.InitialContext;
-
-public class StressTestClient
-{
- private static final String QUEUE_NAME_PREFIX = "BURL:direct://amq.direct//stress-test-queue";
- private static final String DURABLE_SUFFIX = "?durable='true'";
-
- public static final String CONNECTIONS_ARG = "connections";
- public static final String SESSIONS_ARG = "sessions";
- public static final String CONSUME_IMMEDIATELY_ARG = "consumeImmediately";
- public static final String CONSUMERS_ARG = "consumers";
- public static final String CLOSE_CONSUMERS_ARG = "closeconsumers";
- public static final String PRODUCERS_ARG = "producers";
- public static final String MESSAGE_COUNT_ARG = "messagecount";
- public static final String MESSAGE_SIZE_ARG = "size";
- public static final String SUFFIX_ARG = "suffix";
- public static final String REPETITIONS_ARG = "repetitions";
- public static final String PERSISTENT_ARG = "persistent";
- public static final String RANDOM_ARG = "random";
- public static final String TIMEOUT_ARG = "timeout";
- public static final String DELAYCLOSE_ARG = "delayclose";
- public static final String REPORT_MOD_ARG = "reportmod";
- public static final String LOW_PREFETCH_ARG = "lowprefetch";
- public static final String TRANSACTED_ARG = "transacted";
- public static final String TX_BATCH_ARG = "txbatch";
-
- public static final String CONNECTIONS_DEFAULT = "1";
- public static final String SESSIONS_DEFAULT = "1";
- public static final String CONSUME_IMMEDIATELY_DEFAULT = "true";
- public static final String CLOSE_CONSUMERS_DEFAULT = "true";
- public static final String PRODUCERS_DEFAULT = "1";
- public static final String CONSUMERS_DEFAULT = "1";
- public static final String MESSAGE_COUNT_DEFAULT = "1";
- public static final String MESSAGE_SIZE_DEFAULT = "256";
- public static final String SUFFIX_DEFAULT = "";
- public static final String REPETITIONS_DEFAULT = "1";
- public static final String PERSISTENT_DEFAULT = "false";
- public static final String RANDOM_DEFAULT = "true";
- public static final String TIMEOUT_DEFAULT = "30000";
- public static final String DELAYCLOSE_DEFAULT = "0";
- public static final String REPORT_MOD_DEFAULT = "1";
- public static final String LOW_PREFETCH_DEFAULT = "false";
- public static final String TRANSACTED_DEFAULT = "false";
- public static final String TX_BATCH_DEFAULT = "1";
-
- private static final String CLASS = "StressTestClient";
-
- public static void main(String[] args)
- {
- Map<String,String> options = new HashMap<>();
- options.put(CONNECTIONS_ARG, CONNECTIONS_DEFAULT);
- options.put(SESSIONS_ARG, SESSIONS_DEFAULT);
- options.put(CONSUME_IMMEDIATELY_ARG, CONSUME_IMMEDIATELY_DEFAULT);
- options.put(PRODUCERS_ARG, PRODUCERS_DEFAULT);
- options.put(CONSUMERS_ARG, CONSUMERS_DEFAULT);
- options.put(CLOSE_CONSUMERS_ARG, CLOSE_CONSUMERS_DEFAULT);
- options.put(MESSAGE_COUNT_ARG, MESSAGE_COUNT_DEFAULT);
- options.put(MESSAGE_SIZE_ARG, MESSAGE_SIZE_DEFAULT);
- options.put(SUFFIX_ARG, SUFFIX_DEFAULT);
- options.put(REPETITIONS_ARG, REPETITIONS_DEFAULT);
- options.put(PERSISTENT_ARG, PERSISTENT_DEFAULT);
- options.put(RANDOM_ARG, RANDOM_DEFAULT);
- options.put(TIMEOUT_ARG, TIMEOUT_DEFAULT);
- options.put(DELAYCLOSE_ARG, DELAYCLOSE_DEFAULT);
- options.put(REPORT_MOD_ARG, REPORT_MOD_DEFAULT);
- options.put(LOW_PREFETCH_ARG, LOW_PREFETCH_DEFAULT);
- options.put(TRANSACTED_ARG, TRANSACTED_DEFAULT);
- options.put(TX_BATCH_ARG, TX_BATCH_DEFAULT);
-
- if(args.length == 1 &&
- (args[0].equals("-h") || args[0].equals("--help") || args[0].equals("help")))
- {
- System.out.println("arg=value options: \n" + options.keySet());
- return;
- }
-
- parseArgumentsIntoConfig(options, args);
-
- StressTestClient testClient = new StressTestClient();
- testClient.runTest(options);
- }
-
- public static void parseArgumentsIntoConfig(Map<String, String> initialValues, String[] args)
- {
- for(String arg: args)
- {
- String[] splitArg = arg.split("=");
- if(splitArg.length != 2)
- {
- throw new IllegalArgumentException("arguments must have format <name>=<value>: " + arg);
- }
-
- if(initialValues.put(splitArg[0], splitArg[1]) == null)
- {
- throw new IllegalArgumentException("not a valid configuration property: " + arg);
- }
- }
- }
-
-
- private void runTest(Map<String,String> options)
- {
- int numConnections = Integer.parseInt(options.get(CONNECTIONS_ARG));
- int numSessions = Integer.parseInt(options.get(SESSIONS_ARG));
- int numProducers = Integer.parseInt(options.get(PRODUCERS_ARG));
- int numConsumers = Integer.parseInt(options.get(CONSUMERS_ARG));
- boolean closeConsumers = Boolean.valueOf(options.get(CLOSE_CONSUMERS_ARG));
- boolean consumeImmediately = Boolean.valueOf(options.get(CONSUME_IMMEDIATELY_ARG));
- int numMessage = Integer.parseInt(options.get(MESSAGE_COUNT_ARG));
- int messageSize = Integer.parseInt(options.get(MESSAGE_SIZE_ARG));
- int repetitions = Integer.parseInt(options.get(REPETITIONS_ARG));
- String queueString = QUEUE_NAME_PREFIX + options.get(SUFFIX_ARG) + DURABLE_SUFFIX;
- int deliveryMode = Boolean.valueOf(options.get(PERSISTENT_ARG)) ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT;
- boolean random = Boolean.valueOf(options.get(RANDOM_ARG));
- long recieveTimeout = Long.parseLong(options.get(TIMEOUT_ARG));
- long delayClose = Long.parseLong(options.get(DELAYCLOSE_ARG));
- int reportingMod = Integer.parseInt(options.get(REPORT_MOD_ARG));
- boolean lowPrefetch = Boolean.valueOf(options.get(LOW_PREFETCH_ARG));
- boolean transacted = Boolean.valueOf(options.get(TRANSACTED_ARG));
- int txBatch = Integer.parseInt(options.get(TX_BATCH_ARG));
-
- System.out.println(CLASS + ": Using options: " + options);
-
- System.out.println(CLASS + ": Creating message payload of " + messageSize + " (bytes)");
- byte[] sentBytes = generateMessage(random, messageSize);
-
- try
- {
- // Load JNDI properties
- Properties properties = new Properties();
- try(InputStream is = this.getClass().getClassLoader().getResourceAsStream("stress-test-client.properties"))
- {
- properties.load(is);
- }
- Context ctx = new InitialContext(properties);
-
- ConnectionFactory conFac;
- if(lowPrefetch)
- {
- System.out.println(CLASS + ": Using lowprefetch connection factory");
- conFac = (ConnectionFactory)ctx.lookup("qpidConnectionfactoryLowPrefetch");
- }
- else
- {
- conFac = (ConnectionFactory)ctx.lookup("qpidConnectionfactory");
- }
-
- //ensure the queue to be used exists and is bound
- System.out.println(CLASS + ": Creating queue: " + queueString);
- Connection startupConn = conFac.createConnection();
- Session startupSess = startupConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Destination startupDestination = startupSess.createQueue(queueString);
- MessageConsumer startupConsumer = startupSess.createConsumer(startupDestination);
- startupConsumer.close();
- startupSess.close();
- startupConn.close();
-
- for(int rep = 1 ; rep <= repetitions; rep++)
- {
- ArrayList<Connection> connectionList = new ArrayList<>();
-
- for (int co= 1; co<= numConnections ; co++)
- {
- if( co % reportingMod == 0)
- {
- System.out.println(CLASS + ": Creating connection " + co);
- }
- Connection conn = conFac.createConnection();
- conn.setExceptionListener(new ExceptionListener()
- {
- public void onException(JMSException jmse)
- {
- System.err.println(CLASS + ": The sample received an exception through the ExceptionListener");
- jmse.printStackTrace();
- System.exit(0);
- }
- });
-
- connectionList.add(conn);
- conn.start();
- for (int se= 1; se<= numSessions ; se++)
- {
- if( se % reportingMod == 0)
- {
- System.out.println(CLASS + ": Creating Session " + se);
- }
- try
- {
- Session sess;
- if(transacted)
- {
- sess = conn.createSession(true, Session.SESSION_TRANSACTED);
- }
- else
- {
- sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
- }
-
- BytesMessage message = sess.createBytesMessage();
-
- message.writeBytes(sentBytes);
-
- if(!random && numMessage == 1 && numSessions == 1 && numConnections == 1 && repetitions == 1)
- {
- //null the array to save memory
- sentBytes = null;
- }
-
- Destination destination = sess.createQueue(queueString);
-
- MessageConsumer consumer = null;
- for(int cns = 1 ; cns <= numConsumers ; cns++)
- {
- if( cns % reportingMod == 0)
- {
- System.out.println(CLASS + ": Creating Consumer " + cns);
- }
- consumer = sess.createConsumer(destination);
- }
-
- for(int pr = 1 ; pr <= numProducers ; pr++)
- {
- if( pr % reportingMod == 0)
- {
- System.out.println(CLASS + ": Creating Producer " + pr);
- }
- MessageProducer prod = sess.createProducer(destination);
- for(int me = 1; me <= numMessage ; me++)
- {
- if( me % reportingMod == 0)
- {
- System.out.println(CLASS + ": Sending Message " + me);
- }
- prod.send(message, deliveryMode,
- Message.DEFAULT_PRIORITY,
- Message.DEFAULT_TIME_TO_LIVE);
- if(transacted && me % txBatch == 0)
- {
- sess.commit();
- }
- }
- }
-
- if(numConsumers > 0 && consumeImmediately)
- {
- for(int cs = 1 ; cs <= numMessage ; cs++)
- {
- if( cs % reportingMod == 0)
- {
- System.out.println(CLASS + ": Consuming Message " + cs);
- }
- BytesMessage msg = (BytesMessage) consumer.receive(recieveTimeout);
-
- if(transacted && cs % txBatch == 0)
- {
- sess.commit();
- }
-
- if(msg == null)
- {
- throw new RuntimeException("Expected message not received in allowed time: " + recieveTimeout);
- }
-
- validateReceivedMessageContent(sentBytes, msg, random, messageSize);
- }
-
- if(closeConsumers)
- {
- sess.close();
- }
- }
-
- }
- catch (Exception exp)
- {
- System.err.println(CLASS + ": Caught an Exception: " + exp);
- exp.printStackTrace();
- }
-
- }
- }
-
- if(numConsumers == -1 && !consumeImmediately)
- {
- System.out.println(CLASS + ": Consuming left over messages, using recieve timeout:" + recieveTimeout);
-
- Connection conn = conFac.createConnection();
- Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Destination destination = sess.createQueue(queueString);
- MessageConsumer consumer = sess.createConsumer(destination);
- conn.start();
-
- int count = 0;
- while(true)
- {
- BytesMessage msg = (BytesMessage) consumer.receive(recieveTimeout);
-
- if(msg == null)
- {
- System.out.println(CLASS + ": Received " + count + " messages");
- break;
- }
- else
- {
- count++;
- }
-
- validateReceivedMessageContent(sentBytes, msg, random, messageSize);
- }
-
- consumer.close();
- sess.close();
- conn.close();
- }
-
- if(delayClose > 0)
- {
- System.out.println(CLASS + ": Delaying closing connections: " + delayClose);
- Thread.sleep(delayClose);
- }
-
- // Close the connections to the server
- System.out.println(CLASS + ": Closing connections");
-
- for(int connection = 0 ; connection < connectionList.size() ; connection++)
- {
- if( (connection+1) % reportingMod == 0)
- {
- System.out.println(CLASS + ": Closing connection " + (connection+1));
- }
- Connection c = connectionList.get(connection);
- c.close();
- }
-
- // Close the JNDI reference
- System.out.println(CLASS + ": Closing JNDI context");
- ctx.close();
- }
- }
- catch (Exception exp)
- {
- System.err.println(CLASS + ": Caught an Exception: " + exp);
- exp.printStackTrace();
- }
- }
-
-
- private byte[] generateMessage(boolean random, int messageSize)
- {
- byte[] sentBytes = new byte[messageSize];
- if(random)
- {
- //fill the array with numbers from 0-9
- Random rand = new Random(System.currentTimeMillis());
- for(int r = 0 ; r < messageSize ; r++)
- {
- sentBytes[r] = (byte) (48 + rand.nextInt(10));
- }
- }
- else
- {
- //use sequential numbers from 0-9
- for(int r = 0 ; r < messageSize ; r++)
- {
- sentBytes[r] = (byte) (48 + (r % 10));
- }
- }
- return sentBytes;
- }
-
-
- private void validateReceivedMessageContent(byte[] sentBytes,
- BytesMessage msg, boolean random, int messageSize) throws JMSException
- {
- Long length = msg.getBodyLength();
-
- if(length != messageSize)
- {
- throw new RuntimeException("Incorrect number of bytes received");
- }
-
- byte[] recievedBytes = new byte[length.intValue()];
- msg.readBytes(recievedBytes);
-
- if(random)
- {
- if(!Arrays.equals(sentBytes, recievedBytes))
- {
- throw new RuntimeException("Incorrect value of bytes received");
- }
- }
- else
- {
- for(int r = 0 ; r < messageSize ; r++)
- {
- if(! (recievedBytes[r] == (byte) (48 + (r % 10))))
- {
- throw new RuntimeException("Incorrect value of bytes received");
- }
- }
- }
- }
-}
-
diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/TestConfiguration.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/TestConfiguration.java
deleted file mode 100644
index 18870bac59..0000000000
--- a/qpid/java/tools/src/main/java/org/apache/qpid/tools/TestConfiguration.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.tools;
-
-import java.text.DecimalFormat;
-
-import javax.jms.Connection;
-
-public interface TestConfiguration
-{
- enum MessageType {
- BYTES, TEXT, MAP, OBJECT;
-
- public static MessageType getType(String s) throws Exception
- {
- if ("text".equalsIgnoreCase(s))
- {
- return TEXT;
- }
- else if ("bytes".equalsIgnoreCase(s))
- {
- return BYTES;
- }
- /*else if ("map".equalsIgnoreCase(s))
- {
- return MAP;
- }
- else if ("object".equalsIgnoreCase(s))
- {
- return OBJECT;
- }*/
- else
- {
- throw new Exception("Unsupported message type");
- }
- }
- };
-
- public final static String TIMESTAMP = "ts";
-
- public final static String EOS = "eos";
-
- public final static String SEQUENCE_NUMBER = "sn";
-
- public String getUrl();
-
- public String getHost();
-
- public int getPort();
-
- public String getAddress();
-
- public long getTimeout();
-
- public int getAckMode();
-
- public int getMsgCount();
-
- public int getMsgSize();
-
- public int getRandomMsgSizeStartFrom();
-
- public boolean isDurable();
-
- public boolean isTransacted();
-
- public int getTransactionSize();
-
- public int getWarmupCount();
-
- public boolean isCacheMessage();
-
- public boolean isDisableMessageID();
-
- public boolean isDisableTimestamp();
-
- public boolean isRandomMsgSize();
-
- public String getMessageType();
-
- public boolean isPrintStdDev();
-
- public int getSendRate();
-
- public boolean isExternalController();
-
- public boolean isUseUniqueDests();
-
- public int getAckFrequency();
-
- public Connection createConnection() throws Exception;
-
- public DecimalFormat getDecimalFormat();
-
- public int reportEvery();
-
- public boolean isReportTotal();
-
- public boolean isReportHeader();
-
- public int getSendEOS();
-
- public int getConnectionCount();
-
- public int getRollbackFrequency();
-
- public boolean isPrintHeaders();
-
- public boolean isPrintContent();
-
- public long getTTL();
-
- public int getPriority();
-
- public String getReadyAddress();
-} \ No newline at end of file
diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/report/BasicReporter.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/report/BasicReporter.java
deleted file mode 100644
index a9896c1d4e..0000000000
--- a/qpid/java/tools/src/main/java/org/apache/qpid/tools/report/BasicReporter.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.tools.report;
-
-import java.io.PrintStream;
-import java.lang.reflect.Constructor;
-
-import javax.jms.Message;
-
-public class BasicReporter implements Reporter
-{
- PrintStream out;
- int batchSize = 0;
- int batchCount = 0;
- boolean headerPrinted = false;
- protected Statistics overall;
- Statistics batch;
- Constructor<? extends Statistics> statCtor;
-
- public BasicReporter(Class<? extends Statistics> clazz, PrintStream out,
- int batchSize, boolean printHeader) throws Exception
- {
- this.out = out;
- this.batchSize = batchSize;
- this.headerPrinted = !printHeader;
- statCtor = clazz.getConstructor();
- overall = statCtor.newInstance();
- if (batchSize > 0)
- {
- batch = statCtor.newInstance();
- }
- }
-
- /* (non-Javadoc)
- * @see org.apache.qpid.tools.report.Reporter#message(javax.jms.Message)
- */
- @Override
- public void message(Message msg)
- {
- overall.message(msg);
- if (batchSize > 0)
- {
- batch.message(msg);
- if (++batchCount == batchSize)
- {
- if (!headerPrinted)
- {
- header();
- }
- batch.report(out);
- batch.clear();
- batchCount = 0;
- }
- }
- }
-
- /* (non-Javadoc)
- * @see org.apache.qpid.tools.report.Reporter#report()
- */
- @Override
- public void report()
- {
- if (!headerPrinted)
- {
- header();
- }
- overall.report(out);
- }
-
- /* (non-Javadoc)
- * @see org.apache.qpid.tools.report.Reporter#header()
- */
- @Override
- public void header()
- {
- headerPrinted = true;
- overall.header(out);
- }
-
- /* (non-Javadoc)
- * @see org.apache.qpid.tools.report.Reporter#log()
- */
- @Override
- public void log(String s)
- {
- // NOOP
- }
-
- @Override
- public void clear()
- {
- batch.clear();
- overall.clear();
- }
-}
diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/report/MercuryReporter.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/report/MercuryReporter.java
deleted file mode 100644
index e9bf7100c1..0000000000
--- a/qpid/java/tools/src/main/java/org/apache/qpid/tools/report/MercuryReporter.java
+++ /dev/null
@@ -1,167 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.tools.report;
-
-import java.io.PrintStream;
-
-import org.apache.qpid.tools.report.Statistics.Throughput;
-import org.apache.qpid.tools.report.Statistics.ThroughputAndLatency;
-
-public class MercuryReporter extends BasicReporter
-{
- MercuryStatistics stats;
-
- public MercuryReporter(Class<? extends MercuryStatistics> clazz, PrintStream out,
- int batchSize, boolean printHeader) throws Exception
- {
- super(clazz, out, batchSize, printHeader);
- stats = (MercuryStatistics)overall;
- }
-
- public double getRate()
- {
- return stats.getRate();
- }
-
- public double getAvgLatency()
- {
- return stats.getAvgLatency();
- }
-
- public double getStdDev()
- {
- return stats.getStdDev();
- }
-
- public double getMinLatency()
- {
- return stats.getMinLatency();
- }
-
- public double getMaxLatency()
- {
- return stats.getMaxLatency();
- }
-
- public int getSampleSize()
- {
- return stats.getSampleSize();
- }
-
- public interface MercuryStatistics extends Statistics
- {
- public double getRate();
- public long getMinLatency();
- public long getMaxLatency();
- public double getAvgLatency();
- public double getStdDev();
- public int getSampleSize();
- }
-
- public static class MercuryThroughput extends Throughput implements MercuryStatistics
- {
- double rate = 0;
-
- @Override
- public void report(PrintStream out)
- {
- long elapsed = System.currentTimeMillis() - start;
- rate = (double)messages/(double)elapsed;
- }
-
- @Override
- public void clear()
- {
- super.clear();
- rate = 0;
- }
-
- public double getRate()
- {
- return rate;
- }
-
- public int getSampleSize()
- {
- return messages;
- }
-
- public long getMinLatency() { return 0; }
- public long getMaxLatency() { return 0; }
- public double getAvgLatency(){ return 0; }
- public double getStdDev(){ return 0; }
-
- }
-
- public static class MercuryThroughputAndLatency extends ThroughputAndLatency implements MercuryStatistics
- {
- double rate = 0;
- double avgLatency = 0;
- double stdDev;
-
- @Override
- public void report(PrintStream out)
- {
- long elapsed = System.currentTimeMillis() - start;
- rate = (double)messages/(double)elapsed;
- avgLatency = totalLatency/(double)sampleCount;
- }
-
- @Override
- public void clear()
- {
- super.clear();
- rate = 0;
- avgLatency = 0;
- }
-
- public double getRate()
- {
- return rate;
- }
-
- public long getMinLatency()
- {
- return minLatency;
- }
-
- public long getMaxLatency()
- {
- return maxLatency;
- }
-
- public double getAvgLatency()
- {
- return avgLatency;
- }
-
- public double getStdDev()
- {
- return stdDev;
- }
-
- public int getSampleSize()
- {
- return messages;
- }
- }
-
-}
diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/report/Reporter.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/report/Reporter.java
deleted file mode 100644
index 5e481458be..0000000000
--- a/qpid/java/tools/src/main/java/org/apache/qpid/tools/report/Reporter.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.tools.report;
-
-import javax.jms.Message;
-
-public interface Reporter
-{
-
- public void message(Message msg);
-
- public void report();
-
- public void header();
-
- // Will be used by some reporters to print statements which are greped by
- // scripts. Example see java/tools/bin/perf-report
- public void log(String s);
-
- public void clear();
-
-} \ No newline at end of file
diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/report/Statistics.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/report/Statistics.java
deleted file mode 100644
index db8b4ddcee..0000000000
--- a/qpid/java/tools/src/main/java/org/apache/qpid/tools/report/Statistics.java
+++ /dev/null
@@ -1,145 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.tools.report;
-
-import java.io.PrintStream;
-import java.text.DecimalFormat;
-
-import javax.jms.Message;
-
-import org.apache.qpid.tools.TestConfiguration;
-
-public interface Statistics
-{
- public void message(Message msg);
- public void report(PrintStream out);
- public void header(PrintStream out);
- public void clear();
-
- static class Throughput implements Statistics
- {
- DecimalFormat df = new DecimalFormat("###");
- int messages = 0;
- long start = 0;
- boolean started = false;
-
- @Override
- public void message(Message msg)
- {
- ++messages;
- if (!started)
- {
- start = System.currentTimeMillis();
- started = true;
- }
- }
-
- @Override
- public void report(PrintStream out)
- {
- long elapsed = System.currentTimeMillis() - start;
- out.println(df.format((double)messages/(double)elapsed));
- }
-
- @Override
- public void header(PrintStream out)
- {
- out.println("tp(m/s)");
- }
-
- public void clear()
- {
- messages = 0;
- start = 0;
- started = false;
- }
- }
-
- static class ThroughputAndLatency extends Throughput
- {
- long minLatency = Long.MAX_VALUE;
- long maxLatency = Long.MIN_VALUE;
- double totalLatency = 0;
- int sampleCount = 0;
-
- @Override
- public void message(Message msg)
- {
- super.message(msg);
- try
- {
- long ts = msg.getLongProperty(TestConfiguration.TIMESTAMP);
- long latency = System.currentTimeMillis() - ts;
- minLatency = Math.min(latency, minLatency);
- maxLatency = Math.max(latency, maxLatency);
- totalLatency = totalLatency + latency;
- sampleCount++;
- }
- catch(Exception e)
- {
- System.out.println("Error calculating latency " + e);
- }
- }
-
- @Override
- public void report(PrintStream out)
- {
- long elapsed = System.currentTimeMillis() - start;
- double rate = (double)messages/(double)elapsed;
- double avgLatency = totalLatency/(double)sampleCount;
- out.append("\n")
- .append(df.format(rate))
- .append('\t')
- .append(String.valueOf(minLatency))
- .append('\t')
- .append(String.valueOf(maxLatency))
- .append('\t')
- .append(df.format(avgLatency))
- .append("\n");
-
- out.flush();
- }
-
- @Override
- public void header(PrintStream out)
- {
- out.append("tp(m/s)")
- .append('\t')
- .append("l-min")
- .append('\t')
- .append("l-max")
- .append('\t')
- .append("l-avg");
-
- out.flush();
- }
-
- public void clear()
- {
- super.clear();
- minLatency = 0;
- maxLatency = 0;
- totalLatency = 0;
- sampleCount = 0;
- }
- }
-
-}
diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/util/ArgumentsParser.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/util/ArgumentsParser.java
deleted file mode 100644
index a71b466a0f..0000000000
--- a/qpid/java/tools/src/main/java/org/apache/qpid/tools/util/ArgumentsParser.java
+++ /dev/null
@@ -1,172 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.tools.util;
-
-
-import java.lang.reflect.Field;
-import java.lang.reflect.Modifier;
-import java.util.Set;
-
-public class ArgumentsParser
-{
- public ArgumentsParser()
- {
- }
-
- public <T> T parse(String[] args, Class<T> pojoClass)
- {
- T object;
- try
- {
- object = pojoClass.newInstance();
- }
- catch (Exception e)
- {
- throw new IllegalArgumentException("Cannot instantiate object of class " + pojoClass, e);
- }
-
- for (String arg: args)
- {
- int pos = arg.indexOf('=');
- if (pos == -1)
- {
- throw new IllegalArgumentException("Invalid argument '" + arg + "' Argument should be specified in format <name>=<value>");
- }
- String name = arg.substring(0, pos);
- String value = arg.substring(pos + 1);
-
- Field field = findField(pojoClass, name);
- if (field != null)
- {
- setField(object, field, value);
- }
- }
- return object;
- }
-
- private Field findField(Class<?> objectClass, String name)
- {
- Field[] fields = objectClass.getDeclaredFields();
-
- Field field = null;
- for (int i = 0 ; i< fields.length ; i++)
- {
- if (fields[i].getName().equals(name) && !Modifier.isFinal(fields[i].getModifiers()))
- {
- field = fields[i];
- break;
- }
- }
- return field;
- }
-
- private void setField(Object object, Field field, String value)
- {
- Object convertedValue = convertStringToType(value, field.getType(), field.getName());
-
- field.setAccessible(true);
-
- try
- {
- field.set(object, convertedValue);
- }
- catch (IllegalAccessException e)
- {
- throw new RuntimeException("Cannot access field " + field.getName());
- }
- }
-
- private Object convertStringToType(String value, Class<?> fieldType, String fieldName)
- {
- Object o;
- if (fieldType == String.class)
- {
- o = value;
- }
- else if (fieldType == boolean.class)
- {
- try
- {
- o = Boolean.parseBoolean(value);
- }
- catch(Exception e)
- {
- throw new RuntimeException("Cannot convert to boolean argument " + fieldName);
- }
- }
- else if (fieldType == int.class)
- {
- try
- {
- o = Integer.parseInt(value);
- }
- catch(Exception e)
- {
- throw new RuntimeException("Cannot convert to int argument " + fieldName);
- }
- }
- else
- {
- throw new RuntimeException("Unsupported tye " + fieldType + " in " + fieldName);
- }
- return o;
- }
-
- public void usage(Class<?> objectClass, Set<String> requiredFields)
- {
- System.out.println("Supported arguments:");
- Field[] fields = objectClass.getDeclaredFields();
-
- Object object = null;
- try
- {
- object = objectClass.newInstance();
- }
- catch(Exception e)
- {
- // ignore any
- }
-
- for (int i = 0 ; i< fields.length ; i++)
- {
- Field field = fields[i];
- if (!Modifier.isFinal(field.getModifiers()))
- {
- Object defaultValue = null;
- try
- {
- field.setAccessible(true);
- defaultValue = field.get(object);
- }
- catch(Exception e)
- {
- // ignore any
- }
-
- System.out.println(" " + field.getName() + " ( type: "
- + field.getType().getSimpleName().toLowerCase()
- + (object != null ? ", default: " + defaultValue : "")
- + (requiredFields != null && requiredFields.contains(field.getName()) ? ", mandatory" : "")
- + ")");
- }
- }
- }
-}
diff --git a/qpid/java/tools/src/main/resources/stress-test-client.properties b/qpid/java/tools/src/main/resources/stress-test-client.properties
deleted file mode 100644
index 2ef8c258b4..0000000000
--- a/qpid/java/tools/src/main/resources/stress-test-client.properties
+++ /dev/null
@@ -1,3 +0,0 @@
-java.naming.factory.initial = org.apache.qpid.jndi.PropertiesFileInitialContextFactory
-connectionfactory.qpidConnectionfactory = amqp://guest:guest@clientid/?brokerlist='tcp://localhost:5672'
-connectionfactory.qpidConnectionfactoryLowPrefetch=amqp://guest:guest@clientid/?brokerlist='tcp://localhost:5672?maxprefetch='10''