summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
Diffstat (limited to 'java')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java192
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/Main.java176
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java50
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/QueueSenderAdapter.java4
-rw-r--r--java/cluster/src/test/java/log4j.properties28
-rwxr-xr-xjava/perftests/bin/run_many.sh30
-rwxr-xr-xjava/perftests/bin/serviceProvidingClient.sh25
-rwxr-xr-xjava/perftests/bin/serviceRequestingClient.sh27
-rwxr-xr-xjava/perftests/bin/setupclasspath.sh9
-rwxr-xr-xjava/perftests/bin/topicListener.sh25
-rwxr-xr-xjava/perftests/bin/topicPublisher.sh23
-rw-r--r--java/perftests/pom.xml55
-rw-r--r--java/perftests/src/main/java/org/apache/qpid/config/AMQConnectionFactoryInitialiser.java35
-rw-r--r--java/perftests/src/main/java/org/apache/qpid/config/AbstractConfig.java69
-rw-r--r--java/perftests/src/main/java/org/apache/qpid/config/ConnectionFactoryInitialiser.java29
-rw-r--r--java/perftests/src/main/java/org/apache/qpid/config/Connector.java40
-rw-r--r--java/perftests/src/main/java/org/apache/qpid/config/ConnectorConfig.java28
-rw-r--r--java/perftests/src/main/java/org/apache/qpid/config/JBossConnectionFactoryInitialiser.java111
-rw-r--r--java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceProvidingClient.java201
-rw-r--r--java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceRequestingClient.java303
-rw-r--r--java/perftests/src/main/java/org/apache/qpid/topic/Config.java243
-rw-r--r--java/perftests/src/main/java/org/apache/qpid/topic/Listener.java141
-rw-r--r--java/perftests/src/main/java/org/apache/qpid/topic/MessageFactory.java153
-rw-r--r--java/perftests/src/main/java/org/apache/qpid/topic/Publisher.java175
-rw-r--r--java/pom.xml8
-rw-r--r--java/systests/src/test/java/org/apache/qpid/server/AMQBrokerManagerMBeanTest.java82
26 files changed, 2080 insertions, 182 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java b/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java
new file mode 100644
index 0000000000..509f57be7f
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java
@@ -0,0 +1,192 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server;
+
+import org.apache.qpid.server.management.MBeanDescription;
+import org.apache.qpid.server.management.AMQManagedObject;
+import org.apache.qpid.server.management.ManagedBroker;
+import org.apache.qpid.server.management.MBeanConstructor;
+import org.apache.qpid.server.management.ManagedObject;
+import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.exchange.ExchangeFactory;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.registry.IApplicationRegistry;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.AMQException;
+
+import javax.management.JMException;
+import javax.management.MBeanException;
+import javax.management.ObjectName;
+import javax.management.MalformedObjectNameException;
+
+/**
+ * This MBean implements the broker management interface and exposes the
+ * Broker level management features like creating and deleting exchanges and queue.
+ */
+@MBeanDescription("This MBean exposes the broker level management features")
+public class AMQBrokerManagerMBean extends AMQManagedObject implements ManagedBroker
+{
+ private final QueueRegistry _queueRegistry;
+ private final ExchangeRegistry _exchangeRegistry;
+ private final ExchangeFactory _exchangeFactory;
+ private final MessageStore _messageStore;
+
+ @MBeanConstructor("Creates the Broker Manager MBean")
+ public AMQBrokerManagerMBean() throws JMException
+ {
+ super(ManagedBroker.class, ManagedBroker.TYPE);
+ IApplicationRegistry appRegistry = ApplicationRegistry.getInstance();
+ _queueRegistry = appRegistry.getQueueRegistry();
+ _exchangeRegistry = appRegistry.getExchangeRegistry();
+ _exchangeFactory = ApplicationRegistry.getInstance().getExchangeFactory();
+ _messageStore = ApplicationRegistry.getInstance().getMessageStore();
+ }
+
+ public String getObjectInstanceName()
+ {
+ return this.getClass().getName();
+ }
+
+ /**
+ * Creates new exchange and registers it with the registry.
+ *
+ * @param exchangeName
+ * @param type
+ * @param durable
+ * @param autoDelete
+ * @throws JMException
+ */
+ public void createNewExchange(String exchangeName, String type, boolean durable, boolean autoDelete)
+ throws JMException
+ {
+ try
+ {
+ synchronized (_exchangeRegistry)
+ {
+ Exchange exchange = _exchangeRegistry.getExchange(exchangeName);
+ if (exchange == null)
+ {
+ exchange = _exchangeFactory.createExchange(exchangeName, type, durable, autoDelete, 0);
+ _exchangeRegistry.registerExchange(exchange);
+ }
+ else
+ {
+ throw new JMException("The exchange \"" + exchangeName + "\" already exists.");
+ }
+ }
+ }
+ catch (AMQException ex)
+ {
+ throw new MBeanException(ex, "Error in creating exchange " + exchangeName);
+ }
+ }
+
+ /**
+ * Unregisters the exchange from registry.
+ *
+ * @param exchangeName
+ * @throws JMException
+ */
+ public void unregisterExchange(String exchangeName) throws JMException
+ {
+ // TODO
+ // Check if the exchange is in use.
+ // boolean inUse = false;
+ // Check if there are queue-bindings with the exchange and unregister
+ // when there are no bindings.
+ try
+ {
+ _exchangeRegistry.unregisterExchange(exchangeName, false);
+ }
+ catch (AMQException ex)
+ {
+ throw new MBeanException(ex, "Error in unregistering exchange " + exchangeName);
+ }
+ }
+
+ /**
+ * Creates a new queue and registers it with the registry and puts it
+ * in persistance storage if durable queue.
+ *
+ * @param queueName
+ * @param durable
+ * @param owner
+ * @param autoDelete
+ * @throws JMException
+ */
+ public void createNewQueue(String queueName, boolean durable, String owner, boolean autoDelete)
+ throws JMException
+ {
+ AMQQueue queue = _queueRegistry.getQueue(queueName);
+ if (queue != null)
+ {
+ throw new JMException("The queue \"" + queueName + "\" already exists.");
+ }
+
+ try
+ {
+ queue = new AMQQueue(queueName, durable, owner, autoDelete, _queueRegistry);
+ if (queue.isDurable() && !queue.isAutoDelete())
+ {
+ _messageStore.createQueue(queue);
+ }
+ _queueRegistry.registerQueue(queue);
+ }
+ catch (AMQException ex)
+ {
+ throw new MBeanException(ex,"Error in creating queue " + queueName);
+ }
+ }
+
+ /**
+ * Deletes the queue from queue registry and persistant storage.
+ *
+ * @param queueName
+ * @throws JMException
+ */
+ public void deleteQueue(String queueName) throws JMException
+ {
+ AMQQueue queue = _queueRegistry.getQueue(queueName);
+ if (queue == null)
+ {
+ throw new JMException("The Queue " + queueName + " is not a registerd queue.");
+ }
+
+ try
+ {
+ queue.delete();
+ _messageStore.removeQueue(queueName);
+
+ }
+ catch (AMQException ex)
+ {
+ throw new MBeanException(ex, ex.toString());
+ }
+ }
+
+ public ObjectName getObjectName() throws MalformedObjectNameException
+ {
+ StringBuffer objectName = new StringBuffer(ManagedObject.DOMAIN);
+ objectName.append(":type=").append(getType());
+
+ return new ObjectName(objectName.toString());
+ }
+} // End of MBean class
diff --git a/java/broker/src/main/java/org/apache/qpid/server/Main.java b/java/broker/src/main/java/org/apache/qpid/server/Main.java
index 553aecc217..ffd25de0b4 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/Main.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/Main.java
@@ -40,29 +40,16 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.framing.ProtocolVersionList;
import org.apache.qpid.pool.ReadWriteThreadModel;
import org.apache.qpid.server.configuration.VirtualHostConfiguration;
-import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.exchange.ExchangeFactory;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
-import org.apache.qpid.server.management.AMQManagedObject;
-import org.apache.qpid.server.management.MBeanConstructor;
-import org.apache.qpid.server.management.MBeanDescription;
-import org.apache.qpid.server.management.ManagedBroker;
+import org.apache.qpid.server.management.Managable;
import org.apache.qpid.server.management.ManagedObject;
import org.apache.qpid.server.protocol.AMQPFastProtocolHandler;
import org.apache.qpid.server.protocol.AMQPProtocolProvider;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.registry.ConfigurationFileApplicationRegistry;
-import org.apache.qpid.server.registry.IApplicationRegistry;
-import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.transport.ConnectorConfiguration;
import org.apache.qpid.url.URLSyntaxException;
import javax.management.JMException;
-import javax.management.MBeanException;
-import javax.management.MalformedObjectNameException;
-import javax.management.ObjectName;
import java.io.File;
import java.io.IOException;
import java.net.InetAddress;
@@ -75,7 +62,7 @@ import java.util.StringTokenizer;
* Main entry point for AMQPD.
*
*/
-public class Main implements ProtocolVersionList
+public class Main implements ProtocolVersionList, Managable
{
private static final Logger _logger = Logger.getLogger(Main.class);
@@ -83,6 +70,8 @@ public class Main implements ProtocolVersionList
private static final String DEFAULT_LOG_CONFIG_FILENAME = "log4j.xml";
+ private AMQBrokerManagerMBean _mbean = null;
+
protected static class InitException extends Exception
{
InitException(String msg)
@@ -445,7 +434,8 @@ public class Main implements ProtocolVersionList
{
try
{
- new AMQBrokerManager().register();
+ _mbean = new AMQBrokerManagerMBean();
+ _mbean.register();
}
catch (JMException ex)
{
@@ -453,156 +443,8 @@ public class Main implements ProtocolVersionList
}
}
- /**
- * AMQPBrokerMBean implements the broker management interface and exposes the
- * Broker level management features like creating and deleting exchanges and queue.
- */
- @MBeanDescription("This MBean exposes the broker level management features")
- private final class AMQBrokerManager extends AMQManagedObject implements ManagedBroker
+ public ManagedObject getManagedObject()
{
- private final QueueRegistry _queueRegistry;
- private final ExchangeRegistry _exchangeRegistry;
- private final ExchangeFactory _exchangeFactory;
- private final MessageStore _messageStore;
-
- @MBeanConstructor("Creates the Broker Manager MBean")
- protected AMQBrokerManager() throws JMException
- {
- super(ManagedBroker.class, ManagedBroker.TYPE);
- IApplicationRegistry appRegistry = ApplicationRegistry.getInstance();
- _queueRegistry = appRegistry.getQueueRegistry();
- _exchangeRegistry = appRegistry.getExchangeRegistry();
- _exchangeFactory = ApplicationRegistry.getInstance().getExchangeFactory();
- _messageStore = ApplicationRegistry.getInstance().getMessageStore();
- }
-
- public String getObjectInstanceName()
- {
- return this.getClass().getName();
- }
-
- /**
- * Creates new exchange and registers it with the registry.
- * @param exchangeName
- * @param type
- * @param durable
- * @param autoDelete
- * @throws JMException
- */
- public void createNewExchange(String exchangeName, String type, boolean durable, boolean autoDelete)
- throws JMException
- {
- try
- {
- synchronized(_exchangeRegistry)
- {
- Exchange exchange = _exchangeRegistry.getExchange(exchangeName);
- if (exchange == null)
- {
- exchange = _exchangeFactory.createExchange(exchangeName, type, durable, autoDelete, 0);
- _exchangeRegistry.registerExchange(exchange);
- }
- else
- {
- throw new JMException("The exchange \"" + exchangeName + "\" already exists.");
- }
- }
- }
- catch(AMQException ex)
- {
- _logger.error("Error in creating exchange " + exchangeName, ex);
- throw new MBeanException(ex, ex.toString());
- }
- }
-
- /**
- * Unregisters the exchange from registry.
- * @param exchangeName
- * @throws JMException
- */
- public void unregisterExchange(String exchangeName) throws JMException
- {
- // TODO
- // Check if the exchange is in use.
- // boolean inUse = false;
- // Check if there are queue-bindings with the exchange and unregister
- // when there are no bindings.
- try
- {
- _exchangeRegistry.unregisterExchange(exchangeName, false);
- }
- catch(AMQException ex)
- {
- _logger.error("Error in unregistering exchange " + exchangeName, ex);
- throw new MBeanException(ex, ex.toString());
- }
- }
-
- /**
- * Creates a new queue and registers it with the registry and puts it
- * in persistance storage if durable queue.
- * @param queueName
- * @param durable
- * @param owner
- * @param autoDelete
- * @throws JMException
- */
- public void createNewQueue(String queueName, boolean durable, String owner, boolean autoDelete)
- throws JMException
- {
- AMQQueue queue = _queueRegistry.getQueue(queueName);
- if (queue != null)
- {
- throw new JMException("The queue \"" + queueName + "\" already exists.");
- }
-
- try
- {
- queue = new AMQQueue(queueName, durable, owner, autoDelete, _queueRegistry);
- if (queue.isDurable() && !queue.isAutoDelete())
- {
- _messageStore.createQueue(queue);
- }
- _queueRegistry.registerQueue(queue);
- }
- catch (AMQException ex)
- {
- _logger.error("Error in creating queue " + queueName, ex);
- throw new MBeanException(ex, ex.toString());
- }
- }
-
- /**
- * Deletes the queue from queue registry and persistant storage.
- * @param queueName
- * @throws JMException
- */
- public void deleteQueue(String queueName) throws JMException
- {
- AMQQueue queue = _queueRegistry.getQueue(queueName);
- if (queue == null)
- {
- throw new JMException("The Queue " + queueName + " is not a registerd queue.");
- }
-
- try
- {
- queue.delete();
- _messageStore.removeQueue(queueName);
-
- }
- catch (AMQException ex)
- {
- throw new MBeanException(ex, ex.toString());
- }
- }
-
- public ObjectName getObjectName() throws MalformedObjectNameException
- {
- StringBuffer objectName = new StringBuffer(ManagedObject.DOMAIN);
- objectName.append(":type=").append(getType());
-
- return new ObjectName(objectName.toString());
- }
- } // End of MBean class
+ return _mbean;
+ }
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
index 23e2754eb2..93baa3fc29 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
@@ -213,7 +213,10 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
{
return;
}
- _log.info("Async Delivery Message:" + message + " to :" + sub);
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("Async Delivery Message:" + message + " to :" + sub);
+ }
sub.send(message, _queue);
@@ -278,7 +281,10 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
public void deliver(String name, AMQMessage msg) throws AMQException
{
- _log.info(id() + "deliver :" + System.identityHashCode(msg));
+ if (_log.isDebugEnabled())
+ {
+ _log.debug(id() + "deliver :" + System.identityHashCode(msg));
+ }
//Check if we have someone to deliver the message to.
_lock.lock();
@@ -288,7 +294,10 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
if (s == null) //no-one can take the message right now.
{
- _log.info(id() + "Testing Message(" + System.identityHashCode(msg) + ") for Queued Delivery");
+ if (_log.isDebugEnabled())
+ {
+ _log.debug(id() + "Testing Message(" + System.identityHashCode(msg) + ") for Queued Delivery");
+ }
if (!msg.getPublishBody().immediate)
{
addMessageToQueue(msg);
@@ -297,21 +306,33 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
_lock.unlock();
//Pre Deliver to all subscriptions
- _log.info(id() + "We have " + _subscriptions.getSubscriptions().size() + " subscribers to give the message to.");
+ if (_log.isDebugEnabled())
+ {
+ _log.debug(id() + "We have " + _subscriptions.getSubscriptions().size() +
+ " subscribers to give the message to.");
+ }
for (Subscription sub : _subscriptions.getSubscriptions())
{
// stop if the message gets delivered whilst PreDelivering if we have a shared queue.
if (_queue.isShared() && msg.getDeliveredToConsumer())
{
- _log.info(id() + "Stopping PreDelivery as message(" + System.identityHashCode(msg) + ") is already delivered.");
+ if (_log.isDebugEnabled())
+ {
+ _log.debug(id() + "Stopping PreDelivery as message(" + System.identityHashCode(msg) +
+ ") is already delivered.");
+ }
continue;
}
// Only give the message to those that want them.
if (sub.hasInterest(msg))
{
- _log.info(id() + "Queuing message(" + System.identityHashCode(msg) + ") for PreDelivery for subscriber(" + System.identityHashCode(sub) + ")");
+ if (_log.isDebugEnabled())
+ {
+ _log.debug(id() + "Queuing message(" + System.identityHashCode(msg) +
+ ") for PreDelivery for subscriber(" + System.identityHashCode(sub) + ")");
+ }
sub.enqueueForPreDelivery(msg);
}
}
@@ -322,7 +343,11 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
//release lock now
_lock.unlock();
- _log.info(id() + "Delivering Message:" + System.identityHashCode(msg) + " to(" + System.identityHashCode(s) + ") :" + s);
+ if (_log.isDebugEnabled())
+ {
+ _log.debug(id() + "Delivering Message:" + System.identityHashCode(msg) + " to(" +
+ System.identityHashCode(s) + ") :" + s);
+ }
//Deliver the message
s.send(msg, _queue);
}
@@ -330,7 +355,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
finally
{
//ensure lock is released
- if (_lock.isLocked())
+ if (_lock.isHeldByCurrentThread())
{
_lock.unlock();
}
@@ -371,9 +396,12 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
public void processAsync(Executor executor)
{
- _log.info("Processing Async. Queued:" + hasQueuedMessages() + "(" + getQueueMessageCount() + ")" +
- " Active:" + _subscriptions.hasActiveSubscribers() +
- " Processing:" + _processing.get());
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("Processing Async. Queued:" + hasQueuedMessages() + "(" + getQueueMessageCount() + ")" +
+ " Active:" + _subscriptions.hasActiveSubscribers() +
+ " Processing:" + _processing.get());
+ }
if (hasQueuedMessages() && _subscriptions.hasActiveSubscribers())
{
diff --git a/java/client/src/main/java/org/apache/qpid/client/QueueSenderAdapter.java b/java/client/src/main/java/org/apache/qpid/client/QueueSenderAdapter.java
index c8de298ba1..c9d29d8077 100644
--- a/java/client/src/main/java/org/apache/qpid/client/QueueSenderAdapter.java
+++ b/java/client/src/main/java/org/apache/qpid/client/QueueSenderAdapter.java
@@ -132,10 +132,6 @@ public class QueueSenderAdapter implements QueueSender {
throw new javax.jms.IllegalStateException("Publisher is closed");
}
- if(queue == null){
- throw new UnsupportedOperationException("Queue is null");
- }
-
AMQSession session = ((BasicMessageProducer) _delegate).getSession();
if(session == null || session.isClosed()){
diff --git a/java/cluster/src/test/java/log4j.properties b/java/cluster/src/test/java/log4j.properties
new file mode 100644
index 0000000000..6d596d1d19
--- /dev/null
+++ b/java/cluster/src/test/java/log4j.properties
@@ -0,0 +1,28 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+log4j.rootLogger=${root.logging.level}
+
+
+log4j.logger.org.apache.qpid=${amqj.logging.level}, console
+log4j.additivity.org.apache.qpid=false
+
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.Threshold=all
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%t %d %p [%c{4}] %m%n
diff --git a/java/perftests/bin/run_many.sh b/java/perftests/bin/run_many.sh
new file mode 100755
index 0000000000..cca2ffec21
--- /dev/null
+++ b/java/perftests/bin/run_many.sh
@@ -0,0 +1,30 @@
+#!/bin/sh
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+
+# args:
+# <number of processes to start>
+# <name of run>
+# <command ro run>
+
+for i in `seq 1 $1`; do
+ $3 >$2.$i.out 2>>$2.err &
+ echo $! > $2.$i.pid
+done;
diff --git a/java/perftests/bin/serviceProvidingClient.sh b/java/perftests/bin/serviceProvidingClient.sh
new file mode 100755
index 0000000000..6b00486cd2
--- /dev/null
+++ b/java/perftests/bin/serviceProvidingClient.sh
@@ -0,0 +1,25 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# XXX -Xms1024m -XX:NewSize=300m
+. ./setupclasspath.sh
+echo $CP
+# usage: just pass in the host(s)
+$JAVA_HOME/bin/java -cp $CP -Damqj.logging.level=INFO org.apache.qpid.requestreply.ServiceProvidingClient $1 guest guest /test serviceQ
diff --git a/java/perftests/bin/serviceRequestingClient.sh b/java/perftests/bin/serviceRequestingClient.sh
new file mode 100755
index 0000000000..7dd3d63c27
--- /dev/null
+++ b/java/perftests/bin/serviceRequestingClient.sh
@@ -0,0 +1,27 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+# args supplied: <host:port> <num messages>
+thehosts=$1
+shift
+echo $thehosts
+# XXX -Xms1024m -XX:NewSize=300m
+. ./setupclasspath.sh
+echo $CP
+$JAVA_HOME/bin/java -cp $CP -Damqj.logging.level="INFO" org.apache.qpid.requestreply.ServiceRequestingClient $thehosts guest guest /test serviceQ "$@"
diff --git a/java/perftests/bin/setupclasspath.sh b/java/perftests/bin/setupclasspath.sh
new file mode 100755
index 0000000000..a660392e77
--- /dev/null
+++ b/java/perftests/bin/setupclasspath.sh
@@ -0,0 +1,9 @@
+if [ -z $QPID_HOME ] ; then
+ echo "QPID_HOME must be set"
+ exit
+fi
+CP=$QPID_HOME/lib/qpid-incubating.jar:../target/classes
+
+if [ `uname -o` == "Cygwin" ] ; then
+ CP=`cygpath --path --windows $CP`
+fi
diff --git a/java/perftests/bin/topicListener.sh b/java/perftests/bin/topicListener.sh
new file mode 100755
index 0000000000..454efefe7d
--- /dev/null
+++ b/java/perftests/bin/topicListener.sh
@@ -0,0 +1,25 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+
+# XXX -Xmx512m -Xms512m -XX:NewSize=150m
+. ./setupclasspath.sh
+echo $CP
+$JAVA_HOME/bin/java -cp $CP -Damqj.logging.level="INFO" org.apache.qpid.topic.Listener $*
diff --git a/java/perftests/bin/topicPublisher.sh b/java/perftests/bin/topicPublisher.sh
new file mode 100755
index 0000000000..cc3a8736cc
--- /dev/null
+++ b/java/perftests/bin/topicPublisher.sh
@@ -0,0 +1,23 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# XXX -Xmx512m -Xms512m -XX:NewSize=150m
+. ./setupclasspath.sh
+$JAVA_HOME/bin/java -cp $CP -Damqj.logging.level="INFO" org.apache.qpid.topic.Publisher $*
diff --git a/java/perftests/pom.xml b/java/perftests/pom.xml
new file mode 100644
index 0000000000..3af906c4ac
--- /dev/null
+++ b/java/perftests/pom.xml
@@ -0,0 +1,55 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+ -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <groupId>org.apache.qpid</groupId>
+ <artifactId>qpid-perftests</artifactId>
+ <packaging>jar</packaging>
+ <version>1.0-incubating-M2-SNAPSHOT</version>
+ <name>Qpid Performance Tests</name>
+ <url>http://cwiki.apache.org/confluence/display/qpid</url>
+
+ <parent>
+ <groupId>org.apache.qpid</groupId>
+ <artifactId>qpid</artifactId>
+ <version>1.0-incubating-M2-SNAPSHOT</version>
+ </parent>
+
+ <properties>
+ <topDirectoryLocation>..</topDirectoryLocation>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.qpid</groupId>
+ <artifactId>qpid-client</artifactId>
+ </dependency>
+ </dependencies>
+
+<!-- <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-antrun-plugin</artifactId>
+ </plugin>
+ </plugins>
+ </build>-->
+</project>
diff --git a/java/perftests/src/main/java/org/apache/qpid/config/AMQConnectionFactoryInitialiser.java b/java/perftests/src/main/java/org/apache/qpid/config/AMQConnectionFactoryInitialiser.java
new file mode 100644
index 0000000000..cac0064785
--- /dev/null
+++ b/java/perftests/src/main/java/org/apache/qpid/config/AMQConnectionFactoryInitialiser.java
@@ -0,0 +1,35 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.config;
+
+import org.apache.qpid.client.AMQConnectionFactory;
+import org.apache.qpid.config.ConnectionFactoryInitialiser;
+import org.apache.qpid.config.ConnectorConfig;
+
+import javax.jms.ConnectionFactory;
+
+class AMQConnectionFactoryInitialiser implements ConnectionFactoryInitialiser
+{
+ public ConnectionFactory getFactory(ConnectorConfig config)
+ {
+ return new AMQConnectionFactory(config.getHost(), config.getPort(), "/test_path");
+ }
+}
diff --git a/java/perftests/src/main/java/org/apache/qpid/config/AbstractConfig.java b/java/perftests/src/main/java/org/apache/qpid/config/AbstractConfig.java
new file mode 100644
index 0000000000..04381d66a0
--- /dev/null
+++ b/java/perftests/src/main/java/org/apache/qpid/config/AbstractConfig.java
@@ -0,0 +1,69 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.config;
+
+public abstract class AbstractConfig
+{
+ public boolean setOptions(String[] argv)
+ {
+ try
+ {
+ for(int i = 0; i < argv.length - 1; i += 2)
+ {
+ String key = argv[i];
+ String value = argv[i+1];
+ setOption(key, value);
+ }
+ return true;
+ }
+ catch(Exception e)
+ {
+ System.out.println(e.getMessage());
+ }
+ return false;
+ }
+
+ protected int parseInt(String msg, String i)
+ {
+ try
+ {
+ return Integer.parseInt(i);
+ }
+ catch(NumberFormatException e)
+ {
+ throw new RuntimeException(msg + ": " + i);
+ }
+ }
+
+ protected long parseLong(String msg, String i)
+ {
+ try
+ {
+ return Long.parseLong(i);
+ }
+ catch(NumberFormatException e)
+ {
+ throw new RuntimeException(msg + ": " + i);
+ }
+ }
+
+ public abstract void setOption(String key, String value);
+}
diff --git a/java/perftests/src/main/java/org/apache/qpid/config/ConnectionFactoryInitialiser.java b/java/perftests/src/main/java/org/apache/qpid/config/ConnectionFactoryInitialiser.java
new file mode 100644
index 0000000000..a9984eb09a
--- /dev/null
+++ b/java/perftests/src/main/java/org/apache/qpid/config/ConnectionFactoryInitialiser.java
@@ -0,0 +1,29 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.config;
+
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
+
+public interface ConnectionFactoryInitialiser
+{
+ public ConnectionFactory getFactory(ConnectorConfig config) throws JMSException;
+}
diff --git a/java/perftests/src/main/java/org/apache/qpid/config/Connector.java b/java/perftests/src/main/java/org/apache/qpid/config/Connector.java
new file mode 100644
index 0000000000..ff2377f087
--- /dev/null
+++ b/java/perftests/src/main/java/org/apache/qpid/config/Connector.java
@@ -0,0 +1,40 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.config;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+
+public class Connector
+{
+ public Connection createConnection(ConnectorConfig config) throws Exception
+ {
+ return getConnectionFactory(config).createConnection();
+ }
+
+ ConnectionFactory getConnectionFactory(ConnectorConfig config) throws Exception
+ {
+ String factory = config.getFactory();
+ if(factory == null) factory = AMQConnectionFactoryInitialiser.class.getName();
+ System.out.println("Using " + factory);
+ return ((ConnectionFactoryInitialiser) Class.forName(factory).newInstance()).getFactory(config);
+ }
+}
diff --git a/java/perftests/src/main/java/org/apache/qpid/config/ConnectorConfig.java b/java/perftests/src/main/java/org/apache/qpid/config/ConnectorConfig.java
new file mode 100644
index 0000000000..b120ed3f12
--- /dev/null
+++ b/java/perftests/src/main/java/org/apache/qpid/config/ConnectorConfig.java
@@ -0,0 +1,28 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.config;
+
+public interface ConnectorConfig
+{
+ public String getHost();
+ public int getPort();
+ public String getFactory();
+}
diff --git a/java/perftests/src/main/java/org/apache/qpid/config/JBossConnectionFactoryInitialiser.java b/java/perftests/src/main/java/org/apache/qpid/config/JBossConnectionFactoryInitialiser.java
new file mode 100644
index 0000000000..44285efd96
--- /dev/null
+++ b/java/perftests/src/main/java/org/apache/qpid/config/JBossConnectionFactoryInitialiser.java
@@ -0,0 +1,111 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.config;
+
+import org.apache.qpid.config.ConnectionFactoryInitialiser;
+import org.apache.qpid.config.ConnectorConfig;
+
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
+import javax.management.MBeanServerConnection;
+import javax.management.ObjectName;
+import javax.management.MBeanException;
+import javax.naming.InitialContext;
+import javax.naming.NamingException;
+import javax.naming.NameNotFoundException;
+import java.util.Hashtable;
+
+public class JBossConnectionFactoryInitialiser implements ConnectionFactoryInitialiser
+{
+ public ConnectionFactory getFactory(ConnectorConfig config) throws JMSException
+ {
+ ConnectionFactory cf = null;
+ InitialContext ic = null;
+ Hashtable ht = new Hashtable();
+ ht.put(InitialContext.INITIAL_CONTEXT_FACTORY, "org.jnp.interfaces.NamingContextFactory");
+ String jbossHost = System.getProperty("jboss.host", "eqd-lxamq01");
+ String jbossPort = System.getProperty("jboss.port", "1099");
+ ht.put(InitialContext.PROVIDER_URL, "jnp://" + jbossHost + ":" + jbossPort);
+ ht.put(InitialContext.URL_PKG_PREFIXES, "org.jboss.naming:org.jnp.interfaces");
+
+ try
+ {
+ ic = new InitialContext(ht);
+ if (!doesDestinationExist("topictest.messages", ic))
+ {
+ deployTopic("topictest.messages", ic);
+ }
+ if (!doesDestinationExist("topictest.control", ic))
+ {
+ deployTopic("topictest.control", ic);
+ }
+
+ cf = (ConnectionFactory) ic.lookup("/ConnectionFactory");
+ return cf;
+ }
+ catch (NamingException e)
+ {
+ throw new JMSException("Unable to lookup object: " + e);
+ }
+ catch (Exception e)
+ {
+ throw new JMSException("Error creating topic: " + e);
+ }
+ }
+
+ private boolean doesDestinationExist(String name, InitialContext ic) throws Exception
+ {
+ try
+ {
+ ic.lookup("/" + name);
+ }
+ catch (NameNotFoundException e)
+ {
+ return false;
+ }
+ return true;
+ }
+
+ private void deployTopic(String name, InitialContext ic) throws Exception
+ {
+ MBeanServerConnection mBeanServer = lookupMBeanServerProxy(ic);
+
+ ObjectName serverObjectName = new ObjectName("jboss.messaging:service=ServerPeer");
+
+ String jndiName = "/" + name;
+ try
+ {
+ mBeanServer.invoke(serverObjectName, "createTopic",
+ new Object[]{name, jndiName},
+ new String[]{"java.lang.String", "java.lang.String"});
+ }
+ catch (MBeanException e)
+ {
+ System.err.println("Error: " + e);
+ System.err.println("Cause: " + e.getCause());
+ }
+ }
+
+ private MBeanServerConnection lookupMBeanServerProxy(InitialContext ic) throws NamingException
+ {
+ return (MBeanServerConnection) ic.lookup("jmx/invoker/RMIAdaptor");
+ }
+}
diff --git a/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceProvidingClient.java b/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceProvidingClient.java
new file mode 100644
index 0000000000..ddee643a76
--- /dev/null
+++ b/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceProvidingClient.java
@@ -0,0 +1,201 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.requestreply;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.jms.Session;
+import org.apache.qpid.jms.ConnectionListener;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.url.URLSyntaxException;
+
+import javax.jms.*;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+
+public class ServiceProvidingClient
+{
+ private static final Logger _logger = Logger.getLogger(ServiceProvidingClient.class);
+
+ private MessageProducer _destinationProducer;
+
+ private Destination _responseDest;
+
+ private AMQConnection _connection;
+
+ public ServiceProvidingClient(String brokerDetails, String username, String password,
+ String clientName, String virtualPath, String serviceName)
+ throws AMQException, JMSException, URLSyntaxException
+ {
+ _connection = new AMQConnection(brokerDetails, username, password,
+ clientName, virtualPath);
+ _connection.setConnectionListener(new ConnectionListener()
+ {
+
+ public void bytesSent(long count)
+ {
+ }
+
+ public void bytesReceived(long count)
+ {
+ }
+
+ public boolean preFailover(boolean redirect)
+ {
+ return true;
+ }
+
+ public boolean preResubscribe()
+ {
+ return true;
+ }
+
+ public void failoverComplete()
+ {
+ _logger.info("App got failover complete callback");
+ }
+ });
+ final Session session = (Session) _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ _logger.info("Service (queue) name is '" + serviceName + "'...");
+
+ AMQQueue destination = new AMQQueue(serviceName);
+
+ MessageConsumer consumer = session.createConsumer(destination,
+ 100, true, false, null);
+
+ consumer.setMessageListener(new MessageListener()
+ {
+ private int _messageCount;
+
+ public void onMessage(Message message)
+ {
+ //_logger.info("Got message '" + message + "'");
+
+ TextMessage tm = (TextMessage) message;
+
+ try
+ {
+ Destination responseDest = tm.getJMSReplyTo();
+ if (responseDest == null)
+ {
+ _logger.info("Producer not created because the response destination is null.");
+ return;
+ }
+
+ if (!responseDest.equals(_responseDest))
+ {
+ _responseDest = responseDest;
+
+ _logger.info("About to create a producer");
+ _destinationProducer = session.createProducer(responseDest);
+ _destinationProducer.setDisableMessageTimestamp(true);
+ _destinationProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+ _logger.info("After create a producer");
+ }
+ }
+ catch (JMSException e)
+ {
+ _logger.error("Error creating destination");
+ }
+ _messageCount++;
+ if (_messageCount % 1000 == 0)
+ {
+ _logger.info("Received message total: " + _messageCount);
+ _logger.info("Sending response to '" + _responseDest + "'");
+ }
+
+ try
+ {
+ String payload = "This is a response: sing together: 'Mahnah mahnah...'" + tm.getText();
+ TextMessage msg = session.createTextMessage(payload);
+ if (tm.propertyExists("timeSent"))
+ {
+ _logger.info("timeSent property set on message");
+ _logger.info("timeSent value is: " + tm.getLongProperty("timeSent"));
+ msg.setStringProperty("timeSent", tm.getStringProperty("timeSent"));
+ }
+ _destinationProducer.send(msg);
+ if (_messageCount % 1000 == 0)
+ {
+ _logger.info("Sent response to '" + _responseDest + "'");
+ }
+ }
+ catch (JMSException e)
+ {
+ _logger.error("Error sending message: " + e, e);
+ }
+ }
+ });
+ }
+
+ public void run() throws JMSException
+ {
+ _connection.start();
+ _logger.info("Waiting...");
+ }
+
+ public static void main(String[] args)
+ {
+ _logger.info("Starting...");
+
+ if (args.length < 5)
+ {
+ System.out.println("Usage: brokerDetails username password virtual-path serviceQueue [selector]");
+ System.exit(1);
+ }
+ String clientId = null;
+ try
+ {
+ InetAddress address = InetAddress.getLocalHost();
+ clientId = address.getHostName() + System.currentTimeMillis();
+ }
+ catch (UnknownHostException e)
+ {
+ _logger.error("Error: " + e, e);
+ }
+
+ try
+ {
+ ServiceProvidingClient client = new ServiceProvidingClient(args[0], args[1], args[2],
+ clientId, args[3], args[4]);
+ client.run();
+ }
+ catch (JMSException e)
+ {
+ _logger.error("Error: " + e, e);
+ }
+ catch (AMQException e)
+ {
+ _logger.error("Error: " + e, e);
+ }
+ catch (URLSyntaxException e)
+ {
+ _logger.error("Error: " + e, e);
+ }
+
+
+
+ }
+
+}
+
diff --git a/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceRequestingClient.java b/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceRequestingClient.java
new file mode 100644
index 0000000000..b52d06558a
--- /dev/null
+++ b/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceRequestingClient.java
@@ -0,0 +1,303 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.requestreply;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.url.URLSyntaxException;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.jms.MessageConsumer;
+import org.apache.qpid.jms.MessageProducer;
+import org.apache.qpid.jms.Session;
+
+import javax.jms.*;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+
+/**
+ * A client that behaves as follows:
+ * <ul><li>Connects to a queue, whose name is specified as a cmd-line argument</li>
+ * <li>Creates a temporary queue</li>
+ * <li>Creates messages containing a property that is the name of the temporary queue</li>
+ * <li>Fires off a message on the original queue and waits for a response on the temporary queue</li>
+ * </ul>
+ *
+ */
+public class ServiceRequestingClient implements ExceptionListener
+{
+ private static final Logger _log = Logger.getLogger(ServiceRequestingClient.class);
+
+ private static final String MESSAGE_DATA_BYTES = "jfd ghljgl hjvhlj cvhvjf ldhfsj lhfdsjf hldsjfk hdslkfj hsdflk ";
+
+ private String MESSAGE_DATA;
+
+ private AMQConnection _connection;
+
+ private Session _session;
+
+ private long _averageLatency;
+
+ private int _messageCount;
+
+ private volatile boolean _completed;
+
+ private AMQDestination _tempDestination;
+
+ private MessageProducer _producer;
+
+ private Object _waiter;
+
+ private static String createMessagePayload(int size)
+ {
+ _log.info("Message size set to " + size + " bytes");
+ StringBuffer buf = new StringBuffer(size);
+ int count = 0;
+ while (count < size + MESSAGE_DATA_BYTES.length())
+ {
+ buf.append(MESSAGE_DATA_BYTES);
+ count += MESSAGE_DATA_BYTES.length();
+ }
+ if (count < size)
+ {
+ buf.append(MESSAGE_DATA_BYTES, 0, size - count);
+ }
+
+ return buf.toString();
+ }
+
+ private class CallbackHandler implements MessageListener
+ {
+ private int _expectedMessageCount;
+
+ private int _actualMessageCount;
+
+ private long _startTime;
+
+ public CallbackHandler(int expectedMessageCount, long startTime)
+ {
+ _expectedMessageCount = expectedMessageCount;
+ _startTime = startTime;
+ }
+
+ public void onMessage(Message m)
+ {
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("Message received: " + m);
+ }
+ try
+ {
+ m.getPropertyNames();
+ if (m.propertyExists("timeSent"))
+ {
+ long timeSent = Long.parseLong(m.getStringProperty("timeSent"));
+ long now = System.currentTimeMillis();
+ if (_averageLatency == 0)
+ {
+ _averageLatency = now - timeSent;
+ _log.info("Latency " + _averageLatency);
+ }
+ else
+ {
+ _log.info("Individual latency: " + (now - timeSent));
+ _averageLatency = (_averageLatency + (now - timeSent)) / 2;
+ _log.info("Average latency now: " + _averageLatency);
+ }
+ }
+ }
+ catch (JMSException e)
+ {
+ _log.error("Error getting latency data: " + e, e);
+ }
+ _actualMessageCount++;
+ if (_actualMessageCount % 1000 == 0)
+ {
+ _log.info("Received message count: " + _actualMessageCount);
+ }
+
+ if (_actualMessageCount == _expectedMessageCount)
+ {
+ _completed = true;
+ notifyWaiter();
+ long timeTaken = System.currentTimeMillis() - _startTime;
+ _log.info("Total time taken to receive " + _expectedMessageCount + " messages was " +
+ timeTaken + "ms, equivalent to " +
+ (_expectedMessageCount / (timeTaken / 1000.0)) + " messages per second");
+
+ try
+ {
+ _connection.close();
+ _log.info("Connection closed");
+ }
+ catch (JMSException e)
+ {
+ _log.error("Error closing connection");
+ }
+ }
+ }
+ }
+
+ private void notifyWaiter()
+ {
+ if (_waiter != null)
+ {
+ synchronized (_waiter)
+ {
+ _waiter.notify();
+ }
+ }
+ }
+ public ServiceRequestingClient(String brokerHosts, String clientID, String username, String password,
+ String vpath, String commandQueueName,
+ final int messageCount, final int messageDataLength) throws AMQException, URLSyntaxException
+ {
+ _messageCount = messageCount;
+ MESSAGE_DATA = createMessagePayload(messageDataLength);
+ try
+ {
+ createConnection(brokerHosts, clientID, username, password, vpath);
+ _session = (Session) _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+
+ _connection.setExceptionListener(this);
+
+
+ AMQQueue destination = new AMQQueue(commandQueueName);
+ _producer = (MessageProducer) _session.createProducer(destination);
+ _producer.setDisableMessageTimestamp(true);
+ _producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+
+ _tempDestination = new AMQQueue("TempResponse" +
+ Long.toString(System.currentTimeMillis()), true);
+ MessageConsumer messageConsumer = (MessageConsumer) _session.createConsumer(_tempDestination, 100, true,
+ true, null);
+
+ //Send first message, then wait a bit to allow the provider to get initialised
+ TextMessage first = _session.createTextMessage(MESSAGE_DATA);
+ first.setJMSReplyTo(_tempDestination);
+ _producer.send(first);
+ try
+ {
+ Thread.sleep(1000);
+ }
+ catch (InterruptedException ignore)
+ {
+ }
+
+ //now start the clock and the test...
+ final long startTime = System.currentTimeMillis();
+
+ messageConsumer.setMessageListener(new CallbackHandler(messageCount, startTime));
+ }
+ catch (JMSException e)
+ {
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ }
+ }
+
+ /**
+ * Run the test and notify an object upon receipt of all responses.
+ * @param waiter the object that will be notified
+ * @throws JMSException
+ */
+ public void run(Object waiter) throws JMSException
+ {
+ _waiter = waiter;
+ _connection.start();
+ for (int i = 1; i < _messageCount; i++)
+ {
+ TextMessage msg = _session.createTextMessage(MESSAGE_DATA + i);
+ msg.setJMSReplyTo(_tempDestination);
+ if (i % 1000 == 0)
+ {
+ long timeNow = System.currentTimeMillis();
+ msg.setStringProperty("timeSent", String.valueOf(timeNow));
+ }
+ _producer.send(msg);
+ }
+ _log.info("Finished sending " + _messageCount + " messages");
+ }
+
+ public boolean isCompleted()
+ {
+ return _completed;
+ }
+
+ private void createConnection(String brokerHosts, String clientID, String username, String password,
+ String vpath) throws AMQException, URLSyntaxException
+ {
+ _connection = new AMQConnection(brokerHosts, username, password,
+ clientID, vpath);
+ }
+
+ /**
+ * @param args argument 1 if present specifies the name of the temporary queue to create. Leaving it blank
+ * means the server will allocate a name.
+ */
+ public static void main(String[] args)
+ {
+ if (args.length < 6)
+ {
+ System.err.println(
+ "Usage: ServiceRequestingClient <brokerDetails - semicolon separated host:port list> <username> <password> <vpath> <command queue name> <number of messages> <message size>");
+ }
+ try
+ {
+ int messageDataLength = args.length > 6 ? Integer.parseInt(args[6]) : 4096;
+
+ InetAddress address = InetAddress.getLocalHost();
+ String clientID = address.getHostName() + System.currentTimeMillis();
+ ServiceRequestingClient client = new ServiceRequestingClient(args[0], clientID, args[1], args[2], args[3],
+ args[4], Integer.parseInt(args[5]),
+ messageDataLength);
+ Object waiter = new Object();
+ client.run(waiter);
+ synchronized (waiter)
+ {
+ while (!client.isCompleted())
+ {
+ waiter.wait();
+ }
+ }
+
+ }
+ catch (UnknownHostException e)
+ {
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ }
+ catch (Exception e)
+ {
+ System.err.println("Error in client: " + e);
+ e.printStackTrace();
+ }
+ }
+
+ /**
+ * @see javax.jms.ExceptionListener#onException(javax.jms.JMSException)
+ */
+ public void onException(JMSException e)
+ {
+ System.err.println(e.getMessage());
+ e.printStackTrace(System.err);
+ }
+}
diff --git a/java/perftests/src/main/java/org/apache/qpid/topic/Config.java b/java/perftests/src/main/java/org/apache/qpid/topic/Config.java
new file mode 100644
index 0000000000..bb740f9094
--- /dev/null
+++ b/java/perftests/src/main/java/org/apache/qpid/topic/Config.java
@@ -0,0 +1,243 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.topic;
+
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.config.ConnectorConfig;
+import org.apache.qpid.config.ConnectionFactoryInitialiser;
+import org.apache.qpid.config.Connector;
+import org.apache.qpid.config.AbstractConfig;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+
+class Config extends AbstractConfig implements ConnectorConfig
+{
+
+ private String host = "localhost";
+ private int port = 5672;
+ private String factory = null;
+
+ private int payload = 256;
+ private int messages = 1000;
+ private int clients = 1;
+ private int batch = 1;
+ private long delay = 1;
+ private int warmup;
+ private int ackMode= AMQSession.NO_ACKNOWLEDGE;
+ private String clientId;
+ private String subscriptionId;
+ private boolean persistent;
+
+ public Config()
+ {
+ }
+
+ int getAckMode()
+ {
+ return ackMode;
+ }
+
+ void setPayload(int payload)
+ {
+ this.payload = payload;
+ }
+
+ int getPayload()
+ {
+ return payload;
+ }
+
+ void setClients(int clients)
+ {
+ this.clients = clients;
+ }
+
+ int getClients()
+ {
+ return clients;
+ }
+
+ void setMessages(int messages)
+ {
+ this.messages = messages;
+ }
+
+ int getMessages()
+ {
+ return messages;
+ }
+
+ public String getHost()
+ {
+ return host;
+ }
+
+ public void setHost(String host)
+ {
+ this.host = host;
+ }
+
+ public int getPort()
+ {
+ return port;
+ }
+
+ public String getFactory()
+ {
+ return factory;
+ }
+
+ public void setPort(int port)
+ {
+ this.port = port;
+ }
+
+ int getBatch()
+ {
+ return batch;
+ }
+
+ void setBatch(int batch)
+ {
+ this.batch = batch;
+ }
+
+ int getWarmup()
+ {
+ return warmup;
+ }
+
+ void setWarmup(int warmup)
+ {
+ this.warmup = warmup;
+ }
+
+ public long getDelay()
+ {
+ return delay;
+ }
+
+ public void setDelay(long delay)
+ {
+ this.delay = delay;
+ }
+
+ String getClientId()
+ {
+ return clientId;
+ }
+
+ String getSubscriptionId()
+ {
+ return subscriptionId;
+ }
+
+ boolean usePersistentMessages()
+ {
+ return persistent;
+ }
+
+ public void setOption(String key, String value)
+ {
+ if("-host".equalsIgnoreCase(key))
+ {
+ setHost(value);
+ }
+ else if("-port".equalsIgnoreCase(key))
+ {
+ try
+ {
+ setPort(Integer.parseInt(value));
+ }
+ catch(NumberFormatException e)
+ {
+ throw new RuntimeException("Bad port number: " + value);
+ }
+ }
+ else if("-payload".equalsIgnoreCase(key))
+ {
+ setPayload(parseInt("Bad payload size", value));
+ }
+ else if("-messages".equalsIgnoreCase(key))
+ {
+ setMessages(parseInt("Bad message count", value));
+ }
+ else if("-clients".equalsIgnoreCase(key))
+ {
+ setClients(parseInt("Bad client count", value));
+ }
+ else if("-batch".equalsIgnoreCase(key))
+ {
+ setBatch(parseInt("Bad batch count", value));
+ }
+ else if("-delay".equalsIgnoreCase(key))
+ {
+ setDelay(parseLong("Bad batch delay", value));
+ }
+ else if("-warmup".equalsIgnoreCase(key))
+ {
+ setWarmup(parseInt("Bad warmup count", value));
+ }
+ else if("-ack".equalsIgnoreCase(key))
+ {
+ ackMode = parseInt("Bad ack mode", value);
+ }
+ else if("-factory".equalsIgnoreCase(key))
+ {
+ factory = value;
+ }
+ else if("-clientId".equalsIgnoreCase(key))
+ {
+ clientId = value;
+ }
+ else if("-subscriptionId".equalsIgnoreCase(key))
+ {
+ subscriptionId = value;
+ }
+ else if("-persistent".equalsIgnoreCase(key))
+ {
+ persistent = "true".equalsIgnoreCase(value);
+ }
+ else
+ {
+ System.out.println("Ignoring unrecognised option: " + key);
+ }
+ }
+
+ static String getAckModeDescription(int ackMode)
+ {
+ switch(ackMode)
+ {
+ case AMQSession.NO_ACKNOWLEDGE: return "NO_ACKNOWLEDGE";
+ case AMQSession.AUTO_ACKNOWLEDGE: return "AUTO_ACKNOWLEDGE";
+ case AMQSession.CLIENT_ACKNOWLEDGE: return "CLIENT_ACKNOWLEDGE";
+ case AMQSession.DUPS_OK_ACKNOWLEDGE: return "DUPS_OK_ACKNOWELDGE";
+ case AMQSession.PRE_ACKNOWLEDGE: return "PRE_ACKNOWLEDGE";
+ }
+ return "AckMode=" + ackMode;
+ }
+
+ public Connection createConnection() throws Exception
+ {
+ return new Connector().createConnection(this);
+ }
+}
diff --git a/java/perftests/src/main/java/org/apache/qpid/topic/Listener.java b/java/perftests/src/main/java/org/apache/qpid/topic/Listener.java
new file mode 100644
index 0000000000..47c608cfe4
--- /dev/null
+++ b/java/perftests/src/main/java/org/apache/qpid/topic/Listener.java
@@ -0,0 +1,141 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.topic;
+
+import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+public class Listener implements MessageListener
+{
+ private final Connection _connection;
+ private final MessageProducer _controller;
+ private final javax.jms.Session _session;
+ private final MessageFactory _factory;
+ private boolean init;
+ private int count;
+ private long start;
+
+ Listener(Connection connection, int ackMode) throws Exception
+ {
+ this(connection, ackMode, null);
+ }
+
+ Listener(Connection connection, int ackMode, String name) throws Exception
+ {
+ _connection = connection;
+ _session = connection.createSession(false, ackMode);
+ _factory = new MessageFactory(_session);
+
+ //register for events
+ if(name == null)
+ {
+ _factory.createTopicConsumer().setMessageListener(this);
+ }
+ else
+ {
+ _factory.createDurableTopicConsumer(name).setMessageListener(this);
+ }
+
+ _connection.start();
+
+ _controller = _factory.createControlPublisher();
+ System.out.println("Waiting for messages " +
+ Config.getAckModeDescription(ackMode)
+ + (name == null ? "" : " (subscribed with name " + name + " and client id " + connection.getClientID() + ")")
+ + "...");
+
+ }
+
+ private void shutdown()
+ {
+ try
+ {
+ _session.close();
+ _connection.stop();
+ _connection.close();
+ }
+ catch(Exception e)
+ {
+ e.printStackTrace(System.out);
+ }
+ }
+
+ private void report()
+ {
+ try
+ {
+ String msg = getReport();
+ _controller.send(_factory.createReportResponseMessage(msg));
+ System.out.println("Sent report: " + msg);
+ }
+ catch(Exception e)
+ {
+ e.printStackTrace(System.out);
+ }
+ }
+
+ private String getReport()
+ {
+ long time = (System.currentTimeMillis() - start);
+ return "Received " + count + " in " + time + "ms";
+ }
+
+ public void onMessage(Message message)
+ {
+ if(!init)
+ {
+ start = System.currentTimeMillis();
+ count = 0;
+ init = true;
+ }
+
+ if(_factory.isShutdown(message))
+ {
+ shutdown();
+ }
+ else if(_factory.isReport(message))
+ {
+ //send a report:
+ report();
+ init = false;
+ }
+ else if (++count % 100 == 0)
+ {
+ System.out.println("Received " + count + " messages.");
+ }
+ }
+
+ public static void main(String[] argv) throws Exception
+ {
+ Config config = new Config();
+ config.setOptions(argv);
+
+ Connection con = config.createConnection();
+ if(config.getClientId() != null)
+ {
+ con.setClientID(config.getClientId());
+ }
+ new Listener(con, config.getAckMode(), config.getSubscriptionId());
+ }
+}
diff --git a/java/perftests/src/main/java/org/apache/qpid/topic/MessageFactory.java b/java/perftests/src/main/java/org/apache/qpid/topic/MessageFactory.java
new file mode 100644
index 0000000000..1520f18408
--- /dev/null
+++ b/java/perftests/src/main/java/org/apache/qpid/topic/MessageFactory.java
@@ -0,0 +1,153 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.topic;
+
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.AMQTopic;
+
+import javax.jms.*;
+
+/**
+ */
+class MessageFactory
+{
+ private static final char[] DATA = "abcdefghijklmnopqrstuvwxyz".toCharArray();
+
+ private final Session _session;
+ private final Topic _topic;
+ private final Topic _control;
+ private final byte[] _payload;
+
+
+ MessageFactory(Session session) throws JMSException
+ {
+ this(session, 256);
+ }
+
+ MessageFactory(Session session, int size) throws JMSException
+ {
+ _session = session;
+ if(session instanceof AMQSession)
+ {
+ _topic = new AMQTopic("topictest.messages");
+ _control = new AMQTopic("topictest.control");
+ }
+ else
+ {
+ _topic = session.createTopic("topictest.messages");
+ _control = session.createTopic("topictest.control");
+ }
+ _payload = new byte[size];
+
+ for(int i = 0; i < size; i++)
+ {
+ _payload[i] = (byte) DATA[i % DATA.length];
+ }
+ }
+
+ Topic getTopic()
+ {
+ return _topic;
+ }
+
+ Message createEventMessage() throws JMSException
+ {
+ BytesMessage msg = _session.createBytesMessage();
+ msg.writeBytes(_payload);
+ return msg;
+ }
+
+ Message createShutdownMessage() throws JMSException
+ {
+ return _session.createTextMessage("SHUTDOWN");
+ }
+
+ Message createReportRequestMessage() throws JMSException
+ {
+ return _session.createTextMessage("REPORT");
+ }
+
+ Message createReportResponseMessage(String msg) throws JMSException
+ {
+ return _session.createTextMessage(msg);
+ }
+
+ boolean isShutdown(Message m)
+ {
+ return checkText(m, "SHUTDOWN");
+ }
+
+ boolean isReport(Message m)
+ {
+ return checkText(m, "REPORT");
+ }
+
+ Object getReport(Message m)
+ {
+ try
+ {
+ return ((TextMessage) m).getText();
+ }
+ catch (JMSException e)
+ {
+ e.printStackTrace(System.out);
+ return e.toString();
+ }
+ }
+
+ MessageConsumer createTopicConsumer() throws Exception
+ {
+ return _session.createConsumer(_topic);
+ }
+
+ MessageConsumer createDurableTopicConsumer(String name) throws Exception
+ {
+ return _session.createDurableSubscriber(_topic, name);
+ }
+
+ MessageConsumer createControlConsumer() throws Exception
+ {
+ return _session.createConsumer(_control);
+ }
+
+ MessageProducer createTopicPublisher() throws Exception
+ {
+ return _session.createProducer(_topic);
+ }
+
+ MessageProducer createControlPublisher() throws Exception
+ {
+ return _session.createProducer(_control);
+ }
+
+ private static boolean checkText(Message m, String s)
+ {
+ try
+ {
+ return m instanceof TextMessage && ((TextMessage) m).getText().equals(s);
+ }
+ catch (JMSException e)
+ {
+ e.printStackTrace(System.out);
+ return false;
+ }
+ }
+}
diff --git a/java/perftests/src/main/java/org/apache/qpid/topic/Publisher.java b/java/perftests/src/main/java/org/apache/qpid/topic/Publisher.java
new file mode 100644
index 0000000000..d788029ee9
--- /dev/null
+++ b/java/perftests/src/main/java/org/apache/qpid/topic/Publisher.java
@@ -0,0 +1,175 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.topic;
+
+import javax.jms.*;
+
+public class Publisher implements MessageListener
+{
+ private final Object _lock = new Object();
+ private final Connection _connection;
+ private final Session _session;
+ private final MessageFactory _factory;
+ private final MessageProducer _publisher;
+ private int _count;
+
+ Publisher(Connection connection, int size, int ackMode, boolean persistent) throws Exception
+ {
+ _connection = connection;
+ _session = _connection.createSession(false, ackMode);
+ _factory = new MessageFactory(_session, size);
+ _publisher = _factory.createTopicPublisher();
+ _publisher.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+ System.out.println("Publishing " + (persistent ? "persistent" : "non-persistent") + " messages of " + size + " bytes, " + Config.getAckModeDescription(ackMode) + ".");
+ }
+
+ private void test(Config config) throws Exception
+ {
+ test(config.getBatch(), config.getDelay(), config.getMessages(), config.getClients(), config.getWarmup());
+ }
+
+ private void test(int batches, long delay, int msgCount, int consumerCount, int warmup) throws Exception
+ {
+ _factory.createControlConsumer().setMessageListener(this);
+ _connection.start();
+
+ if(warmup > 0)
+ {
+ System.out.println("Runing warmup (" + warmup + " msgs)");
+ long time = batch(warmup, consumerCount);
+ System.out.println("Warmup completed in " + time + "ms");
+ }
+
+ long[] times = new long[batches];
+ for(int i = 0; i < batches; i++)
+ {
+ if(i > 0) Thread.sleep(delay*1000);
+ times[i] = batch(msgCount, consumerCount);
+ System.out.println("Batch " + (i+1) + " of " + batches + " completed in " + times[i] + " ms.");
+ }
+
+ long min = min(times);
+ long max = max(times);
+ System.out.println("min: " + min + ", max: " + max + " avg: " + avg(times, min, max));
+
+ //request shutdown
+ _publisher.send(_factory.createShutdownMessage());
+
+ _connection.stop();
+ _connection.close();
+ }
+
+ private long batch(int msgCount, int consumerCount) throws Exception
+ {
+ _count = consumerCount;
+ long start = System.currentTimeMillis();
+ publish(msgCount);
+ waitForCompletion(consumerCount);
+ return System.currentTimeMillis() - start;
+ }
+
+ private void publish(int count) throws Exception
+ {
+
+ //send events
+ for (int i = 0; i < count; i++)
+ {
+ _publisher.send(_factory.createEventMessage());
+ if ((i + 1) % 100 == 0)
+ {
+ System.out.println("Sent " + (i + 1) + " messages");
+ }
+ }
+
+ //request report
+ _publisher.send(_factory.createReportRequestMessage());
+ }
+
+ private void waitForCompletion(int consumers) throws Exception
+ {
+ System.out.println("Waiting for completion...");
+ synchronized (_lock)
+ {
+ while (_count > 0)
+ {
+ _lock.wait();
+ }
+ }
+ }
+
+
+ public void onMessage(Message message)
+ {
+ System.out.println("Received report " + _factory.getReport(message) + " " + --_count + " remaining");
+ if (_count == 0)
+ {
+ synchronized (_lock)
+ {
+ _lock.notify();
+ }
+ }
+ }
+
+ static long min(long[] times)
+ {
+ long min = times.length > 0 ? times[0] : 0;
+ for(int i = 0; i < times.length; i++)
+ {
+ min = Math.min(min, times[i]);
+ }
+ return min;
+ }
+
+ static long max(long[] times)
+ {
+ long max = times.length > 0 ? times[0] : 0;
+ for(int i = 0; i < times.length; i++)
+ {
+ max = Math.max(max, times[i]);
+ }
+ return max;
+ }
+
+ static long avg(long[] times, long min, long max)
+ {
+ long sum = 0;
+ for(int i = 0; i < times.length; i++)
+ {
+ sum += times[i];
+ }
+ sum -= min;
+ sum -= max;
+
+ return (sum / (times.length - 2));
+ }
+
+ public static void main(String[] argv) throws Exception
+ {
+ Config config = new Config();
+ config.setOptions(argv);
+
+ Connection con = config.createConnection();
+ int size = config.getPayload();
+ int ackMode = config.getAckMode();
+ boolean persistent = config.usePersistentMessages();
+ new Publisher(con, size, ackMode, persistent).test(config);
+ }
+}
diff --git a/java/pom.xml b/java/pom.xml
index dd63bbb100..0f1f016a54 100644
--- a/java/pom.xml
+++ b/java/pom.xml
@@ -40,6 +40,14 @@
<maven>2.0.4</maven>
</prerequisites>
+ <distributionManagement>
+ <snapshotRepository>
+ <id>apache.snapshots</id>
+ <name>Apache SNAPSHOT Repository</name>
+ <url>scp://people.apache.org/www/people.apache.org/repo/m2-snapshot-repository</url>
+ </snapshotRepository>
+ </distributionManagement>
+
<inceptionYear>2006</inceptionYear>
<mailingLists>
<mailingList>
diff --git a/java/systests/src/test/java/org/apache/qpid/server/AMQBrokerManagerMBeanTest.java b/java/systests/src/test/java/org/apache/qpid/server/AMQBrokerManagerMBeanTest.java
new file mode 100644
index 0000000000..21ad1b6a7f
--- /dev/null
+++ b/java/systests/src/test/java/org/apache/qpid/server/AMQBrokerManagerMBeanTest.java
@@ -0,0 +1,82 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server;
+
+import junit.framework.TestCase;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.management.ManagedBroker;
+import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.registry.IApplicationRegistry;
+
+public class AMQBrokerManagerMBeanTest extends TestCase
+{
+ private QueueRegistry _queueRegistry;
+ private ExchangeRegistry _exchangeRegistry;
+
+ public void testExchangeOperations() throws Exception
+ {
+ String exchange1 = "testExchange1_" + System.currentTimeMillis();
+ String exchange2 = "testExchange2_" + System.currentTimeMillis();
+ String exchange3 = "testExchange3_" + System.currentTimeMillis();
+
+ assertTrue(_exchangeRegistry.getExchange(exchange1) == null);
+ assertTrue(_exchangeRegistry.getExchange(exchange2) == null);
+ assertTrue(_exchangeRegistry.getExchange(exchange3) == null);
+
+ ManagedBroker mbean = new AMQBrokerManagerMBean();
+ mbean.createNewExchange(exchange1,"direct",false, false);
+ mbean.createNewExchange(exchange2,"topic",false, false);
+ mbean.createNewExchange(exchange3,"headers",false, false);
+
+ assertTrue(_exchangeRegistry.getExchange(exchange1) != null);
+ assertTrue(_exchangeRegistry.getExchange(exchange2) != null);
+ assertTrue(_exchangeRegistry.getExchange(exchange3) != null);
+
+ mbean.unregisterExchange(exchange1);
+ mbean.unregisterExchange(exchange2);
+ mbean.unregisterExchange(exchange3);
+
+ assertTrue(_exchangeRegistry.getExchange(exchange1) == null);
+ assertTrue(_exchangeRegistry.getExchange(exchange2) == null);
+ assertTrue(_exchangeRegistry.getExchange(exchange3) == null);
+ }
+
+ public void testQueueOperations() throws Exception
+ {
+ String queueName = "testQueue_" + System.currentTimeMillis();
+ ManagedBroker mbean = new AMQBrokerManagerMBean();
+
+ assertTrue(_queueRegistry.getQueue(queueName) == null);
+
+ mbean.createNewQueue(queueName, false, "test", true);
+ assertTrue(_queueRegistry.getQueue(queueName) != null);
+
+ mbean.deleteQueue(queueName);
+ assertTrue(_queueRegistry.getQueue(queueName) == null);
+ }
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ IApplicationRegistry appRegistry = ApplicationRegistry.getInstance();
+ _queueRegistry = appRegistry.getQueueRegistry();
+ _exchangeRegistry = appRegistry.getExchangeRegistry();
+ }
+}