diff options
Diffstat (limited to 'java')
26 files changed, 2080 insertions, 182 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java b/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java new file mode 100644 index 0000000000..509f57be7f --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java @@ -0,0 +1,192 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.server; + +import org.apache.qpid.server.management.MBeanDescription; +import org.apache.qpid.server.management.AMQManagedObject; +import org.apache.qpid.server.management.ManagedBroker; +import org.apache.qpid.server.management.MBeanConstructor; +import org.apache.qpid.server.management.ManagedObject; +import org.apache.qpid.server.queue.QueueRegistry; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.exchange.ExchangeRegistry; +import org.apache.qpid.server.exchange.ExchangeFactory; +import org.apache.qpid.server.exchange.Exchange; +import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.registry.IApplicationRegistry; +import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.AMQException; + +import javax.management.JMException; +import javax.management.MBeanException; +import javax.management.ObjectName; +import javax.management.MalformedObjectNameException; + +/** + * This MBean implements the broker management interface and exposes the + * Broker level management features like creating and deleting exchanges and queue. + */ +@MBeanDescription("This MBean exposes the broker level management features") +public class AMQBrokerManagerMBean extends AMQManagedObject implements ManagedBroker +{ + private final QueueRegistry _queueRegistry; + private final ExchangeRegistry _exchangeRegistry; + private final ExchangeFactory _exchangeFactory; + private final MessageStore _messageStore; + + @MBeanConstructor("Creates the Broker Manager MBean") + public AMQBrokerManagerMBean() throws JMException + { + super(ManagedBroker.class, ManagedBroker.TYPE); + IApplicationRegistry appRegistry = ApplicationRegistry.getInstance(); + _queueRegistry = appRegistry.getQueueRegistry(); + _exchangeRegistry = appRegistry.getExchangeRegistry(); + _exchangeFactory = ApplicationRegistry.getInstance().getExchangeFactory(); + _messageStore = ApplicationRegistry.getInstance().getMessageStore(); + } + + public String getObjectInstanceName() + { + return this.getClass().getName(); + } + + /** + * Creates new exchange and registers it with the registry. + * + * @param exchangeName + * @param type + * @param durable + * @param autoDelete + * @throws JMException + */ + public void createNewExchange(String exchangeName, String type, boolean durable, boolean autoDelete) + throws JMException + { + try + { + synchronized (_exchangeRegistry) + { + Exchange exchange = _exchangeRegistry.getExchange(exchangeName); + if (exchange == null) + { + exchange = _exchangeFactory.createExchange(exchangeName, type, durable, autoDelete, 0); + _exchangeRegistry.registerExchange(exchange); + } + else + { + throw new JMException("The exchange \"" + exchangeName + "\" already exists."); + } + } + } + catch (AMQException ex) + { + throw new MBeanException(ex, "Error in creating exchange " + exchangeName); + } + } + + /** + * Unregisters the exchange from registry. + * + * @param exchangeName + * @throws JMException + */ + public void unregisterExchange(String exchangeName) throws JMException + { + // TODO + // Check if the exchange is in use. + // boolean inUse = false; + // Check if there are queue-bindings with the exchange and unregister + // when there are no bindings. + try + { + _exchangeRegistry.unregisterExchange(exchangeName, false); + } + catch (AMQException ex) + { + throw new MBeanException(ex, "Error in unregistering exchange " + exchangeName); + } + } + + /** + * Creates a new queue and registers it with the registry and puts it + * in persistance storage if durable queue. + * + * @param queueName + * @param durable + * @param owner + * @param autoDelete + * @throws JMException + */ + public void createNewQueue(String queueName, boolean durable, String owner, boolean autoDelete) + throws JMException + { + AMQQueue queue = _queueRegistry.getQueue(queueName); + if (queue != null) + { + throw new JMException("The queue \"" + queueName + "\" already exists."); + } + + try + { + queue = new AMQQueue(queueName, durable, owner, autoDelete, _queueRegistry); + if (queue.isDurable() && !queue.isAutoDelete()) + { + _messageStore.createQueue(queue); + } + _queueRegistry.registerQueue(queue); + } + catch (AMQException ex) + { + throw new MBeanException(ex,"Error in creating queue " + queueName); + } + } + + /** + * Deletes the queue from queue registry and persistant storage. + * + * @param queueName + * @throws JMException + */ + public void deleteQueue(String queueName) throws JMException + { + AMQQueue queue = _queueRegistry.getQueue(queueName); + if (queue == null) + { + throw new JMException("The Queue " + queueName + " is not a registerd queue."); + } + + try + { + queue.delete(); + _messageStore.removeQueue(queueName); + + } + catch (AMQException ex) + { + throw new MBeanException(ex, ex.toString()); + } + } + + public ObjectName getObjectName() throws MalformedObjectNameException + { + StringBuffer objectName = new StringBuffer(ManagedObject.DOMAIN); + objectName.append(":type=").append(getType()); + + return new ObjectName(objectName.toString()); + } +} // End of MBean class diff --git a/java/broker/src/main/java/org/apache/qpid/server/Main.java b/java/broker/src/main/java/org/apache/qpid/server/Main.java index 553aecc217..ffd25de0b4 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/Main.java +++ b/java/broker/src/main/java/org/apache/qpid/server/Main.java @@ -40,29 +40,16 @@ import org.apache.qpid.AMQException; import org.apache.qpid.framing.ProtocolVersionList; import org.apache.qpid.pool.ReadWriteThreadModel; import org.apache.qpid.server.configuration.VirtualHostConfiguration; -import org.apache.qpid.server.exchange.Exchange; -import org.apache.qpid.server.exchange.ExchangeFactory; -import org.apache.qpid.server.exchange.ExchangeRegistry; -import org.apache.qpid.server.management.AMQManagedObject; -import org.apache.qpid.server.management.MBeanConstructor; -import org.apache.qpid.server.management.MBeanDescription; -import org.apache.qpid.server.management.ManagedBroker; +import org.apache.qpid.server.management.Managable; import org.apache.qpid.server.management.ManagedObject; import org.apache.qpid.server.protocol.AMQPFastProtocolHandler; import org.apache.qpid.server.protocol.AMQPProtocolProvider; -import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.registry.ConfigurationFileApplicationRegistry; -import org.apache.qpid.server.registry.IApplicationRegistry; -import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.transport.ConnectorConfiguration; import org.apache.qpid.url.URLSyntaxException; import javax.management.JMException; -import javax.management.MBeanException; -import javax.management.MalformedObjectNameException; -import javax.management.ObjectName; import java.io.File; import java.io.IOException; import java.net.InetAddress; @@ -75,7 +62,7 @@ import java.util.StringTokenizer; * Main entry point for AMQPD. * */ -public class Main implements ProtocolVersionList +public class Main implements ProtocolVersionList, Managable { private static final Logger _logger = Logger.getLogger(Main.class); @@ -83,6 +70,8 @@ public class Main implements ProtocolVersionList private static final String DEFAULT_LOG_CONFIG_FILENAME = "log4j.xml"; + private AMQBrokerManagerMBean _mbean = null; + protected static class InitException extends Exception { InitException(String msg) @@ -445,7 +434,8 @@ public class Main implements ProtocolVersionList { try { - new AMQBrokerManager().register(); + _mbean = new AMQBrokerManagerMBean(); + _mbean.register(); } catch (JMException ex) { @@ -453,156 +443,8 @@ public class Main implements ProtocolVersionList } } - /** - * AMQPBrokerMBean implements the broker management interface and exposes the - * Broker level management features like creating and deleting exchanges and queue. - */ - @MBeanDescription("This MBean exposes the broker level management features") - private final class AMQBrokerManager extends AMQManagedObject implements ManagedBroker + public ManagedObject getManagedObject() { - private final QueueRegistry _queueRegistry; - private final ExchangeRegistry _exchangeRegistry; - private final ExchangeFactory _exchangeFactory; - private final MessageStore _messageStore; - - @MBeanConstructor("Creates the Broker Manager MBean") - protected AMQBrokerManager() throws JMException - { - super(ManagedBroker.class, ManagedBroker.TYPE); - IApplicationRegistry appRegistry = ApplicationRegistry.getInstance(); - _queueRegistry = appRegistry.getQueueRegistry(); - _exchangeRegistry = appRegistry.getExchangeRegistry(); - _exchangeFactory = ApplicationRegistry.getInstance().getExchangeFactory(); - _messageStore = ApplicationRegistry.getInstance().getMessageStore(); - } - - public String getObjectInstanceName() - { - return this.getClass().getName(); - } - - /** - * Creates new exchange and registers it with the registry. - * @param exchangeName - * @param type - * @param durable - * @param autoDelete - * @throws JMException - */ - public void createNewExchange(String exchangeName, String type, boolean durable, boolean autoDelete) - throws JMException - { - try - { - synchronized(_exchangeRegistry) - { - Exchange exchange = _exchangeRegistry.getExchange(exchangeName); - if (exchange == null) - { - exchange = _exchangeFactory.createExchange(exchangeName, type, durable, autoDelete, 0); - _exchangeRegistry.registerExchange(exchange); - } - else - { - throw new JMException("The exchange \"" + exchangeName + "\" already exists."); - } - } - } - catch(AMQException ex) - { - _logger.error("Error in creating exchange " + exchangeName, ex); - throw new MBeanException(ex, ex.toString()); - } - } - - /** - * Unregisters the exchange from registry. - * @param exchangeName - * @throws JMException - */ - public void unregisterExchange(String exchangeName) throws JMException - { - // TODO - // Check if the exchange is in use. - // boolean inUse = false; - // Check if there are queue-bindings with the exchange and unregister - // when there are no bindings. - try - { - _exchangeRegistry.unregisterExchange(exchangeName, false); - } - catch(AMQException ex) - { - _logger.error("Error in unregistering exchange " + exchangeName, ex); - throw new MBeanException(ex, ex.toString()); - } - } - - /** - * Creates a new queue and registers it with the registry and puts it - * in persistance storage if durable queue. - * @param queueName - * @param durable - * @param owner - * @param autoDelete - * @throws JMException - */ - public void createNewQueue(String queueName, boolean durable, String owner, boolean autoDelete) - throws JMException - { - AMQQueue queue = _queueRegistry.getQueue(queueName); - if (queue != null) - { - throw new JMException("The queue \"" + queueName + "\" already exists."); - } - - try - { - queue = new AMQQueue(queueName, durable, owner, autoDelete, _queueRegistry); - if (queue.isDurable() && !queue.isAutoDelete()) - { - _messageStore.createQueue(queue); - } - _queueRegistry.registerQueue(queue); - } - catch (AMQException ex) - { - _logger.error("Error in creating queue " + queueName, ex); - throw new MBeanException(ex, ex.toString()); - } - } - - /** - * Deletes the queue from queue registry and persistant storage. - * @param queueName - * @throws JMException - */ - public void deleteQueue(String queueName) throws JMException - { - AMQQueue queue = _queueRegistry.getQueue(queueName); - if (queue == null) - { - throw new JMException("The Queue " + queueName + " is not a registerd queue."); - } - - try - { - queue.delete(); - _messageStore.removeQueue(queueName); - - } - catch (AMQException ex) - { - throw new MBeanException(ex, ex.toString()); - } - } - - public ObjectName getObjectName() throws MalformedObjectNameException - { - StringBuffer objectName = new StringBuffer(ManagedObject.DOMAIN); - objectName.append(":type=").append(getType()); - - return new ObjectName(objectName.toString()); - } - } // End of MBean class + return _mbean; + } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java index 23e2754eb2..93baa3fc29 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java @@ -213,7 +213,10 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager { return; } - _log.info("Async Delivery Message:" + message + " to :" + sub); + if (_log.isDebugEnabled()) + { + _log.debug("Async Delivery Message:" + message + " to :" + sub); + } sub.send(message, _queue); @@ -278,7 +281,10 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager public void deliver(String name, AMQMessage msg) throws AMQException { - _log.info(id() + "deliver :" + System.identityHashCode(msg)); + if (_log.isDebugEnabled()) + { + _log.debug(id() + "deliver :" + System.identityHashCode(msg)); + } //Check if we have someone to deliver the message to. _lock.lock(); @@ -288,7 +294,10 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager if (s == null) //no-one can take the message right now. { - _log.info(id() + "Testing Message(" + System.identityHashCode(msg) + ") for Queued Delivery"); + if (_log.isDebugEnabled()) + { + _log.debug(id() + "Testing Message(" + System.identityHashCode(msg) + ") for Queued Delivery"); + } if (!msg.getPublishBody().immediate) { addMessageToQueue(msg); @@ -297,21 +306,33 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager _lock.unlock(); //Pre Deliver to all subscriptions - _log.info(id() + "We have " + _subscriptions.getSubscriptions().size() + " subscribers to give the message to."); + if (_log.isDebugEnabled()) + { + _log.debug(id() + "We have " + _subscriptions.getSubscriptions().size() + + " subscribers to give the message to."); + } for (Subscription sub : _subscriptions.getSubscriptions()) { // stop if the message gets delivered whilst PreDelivering if we have a shared queue. if (_queue.isShared() && msg.getDeliveredToConsumer()) { - _log.info(id() + "Stopping PreDelivery as message(" + System.identityHashCode(msg) + ") is already delivered."); + if (_log.isDebugEnabled()) + { + _log.debug(id() + "Stopping PreDelivery as message(" + System.identityHashCode(msg) + + ") is already delivered."); + } continue; } // Only give the message to those that want them. if (sub.hasInterest(msg)) { - _log.info(id() + "Queuing message(" + System.identityHashCode(msg) + ") for PreDelivery for subscriber(" + System.identityHashCode(sub) + ")"); + if (_log.isDebugEnabled()) + { + _log.debug(id() + "Queuing message(" + System.identityHashCode(msg) + + ") for PreDelivery for subscriber(" + System.identityHashCode(sub) + ")"); + } sub.enqueueForPreDelivery(msg); } } @@ -322,7 +343,11 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager //release lock now _lock.unlock(); - _log.info(id() + "Delivering Message:" + System.identityHashCode(msg) + " to(" + System.identityHashCode(s) + ") :" + s); + if (_log.isDebugEnabled()) + { + _log.debug(id() + "Delivering Message:" + System.identityHashCode(msg) + " to(" + + System.identityHashCode(s) + ") :" + s); + } //Deliver the message s.send(msg, _queue); } @@ -330,7 +355,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager finally { //ensure lock is released - if (_lock.isLocked()) + if (_lock.isHeldByCurrentThread()) { _lock.unlock(); } @@ -371,9 +396,12 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager public void processAsync(Executor executor) { - _log.info("Processing Async. Queued:" + hasQueuedMessages() + "(" + getQueueMessageCount() + ")" + - " Active:" + _subscriptions.hasActiveSubscribers() + - " Processing:" + _processing.get()); + if (_log.isDebugEnabled()) + { + _log.debug("Processing Async. Queued:" + hasQueuedMessages() + "(" + getQueueMessageCount() + ")" + + " Active:" + _subscriptions.hasActiveSubscribers() + + " Processing:" + _processing.get()); + } if (hasQueuedMessages() && _subscriptions.hasActiveSubscribers()) { diff --git a/java/client/src/main/java/org/apache/qpid/client/QueueSenderAdapter.java b/java/client/src/main/java/org/apache/qpid/client/QueueSenderAdapter.java index c8de298ba1..c9d29d8077 100644 --- a/java/client/src/main/java/org/apache/qpid/client/QueueSenderAdapter.java +++ b/java/client/src/main/java/org/apache/qpid/client/QueueSenderAdapter.java @@ -132,10 +132,6 @@ public class QueueSenderAdapter implements QueueSender { throw new javax.jms.IllegalStateException("Publisher is closed"); } - if(queue == null){ - throw new UnsupportedOperationException("Queue is null"); - } - AMQSession session = ((BasicMessageProducer) _delegate).getSession(); if(session == null || session.isClosed()){ diff --git a/java/cluster/src/test/java/log4j.properties b/java/cluster/src/test/java/log4j.properties new file mode 100644 index 0000000000..6d596d1d19 --- /dev/null +++ b/java/cluster/src/test/java/log4j.properties @@ -0,0 +1,28 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +log4j.rootLogger=${root.logging.level} + + +log4j.logger.org.apache.qpid=${amqj.logging.level}, console +log4j.additivity.org.apache.qpid=false + +log4j.appender.console=org.apache.log4j.ConsoleAppender +log4j.appender.console.Threshold=all +log4j.appender.console.layout=org.apache.log4j.PatternLayout +log4j.appender.console.layout.ConversionPattern=%t %d %p [%c{4}] %m%n diff --git a/java/perftests/bin/run_many.sh b/java/perftests/bin/run_many.sh new file mode 100755 index 0000000000..cca2ffec21 --- /dev/null +++ b/java/perftests/bin/run_many.sh @@ -0,0 +1,30 @@ +#!/bin/sh +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + + +# args: +# <number of processes to start> +# <name of run> +# <command ro run> + +for i in `seq 1 $1`; do + $3 >$2.$i.out 2>>$2.err & + echo $! > $2.$i.pid +done; diff --git a/java/perftests/bin/serviceProvidingClient.sh b/java/perftests/bin/serviceProvidingClient.sh new file mode 100755 index 0000000000..6b00486cd2 --- /dev/null +++ b/java/perftests/bin/serviceProvidingClient.sh @@ -0,0 +1,25 @@ +#!/bin/bash +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# XXX -Xms1024m -XX:NewSize=300m +. ./setupclasspath.sh +echo $CP +# usage: just pass in the host(s) +$JAVA_HOME/bin/java -cp $CP -Damqj.logging.level=INFO org.apache.qpid.requestreply.ServiceProvidingClient $1 guest guest /test serviceQ diff --git a/java/perftests/bin/serviceRequestingClient.sh b/java/perftests/bin/serviceRequestingClient.sh new file mode 100755 index 0000000000..7dd3d63c27 --- /dev/null +++ b/java/perftests/bin/serviceRequestingClient.sh @@ -0,0 +1,27 @@ +#!/bin/bash +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +# args supplied: <host:port> <num messages> +thehosts=$1 +shift +echo $thehosts +# XXX -Xms1024m -XX:NewSize=300m +. ./setupclasspath.sh +echo $CP +$JAVA_HOME/bin/java -cp $CP -Damqj.logging.level="INFO" org.apache.qpid.requestreply.ServiceRequestingClient $thehosts guest guest /test serviceQ "$@" diff --git a/java/perftests/bin/setupclasspath.sh b/java/perftests/bin/setupclasspath.sh new file mode 100755 index 0000000000..a660392e77 --- /dev/null +++ b/java/perftests/bin/setupclasspath.sh @@ -0,0 +1,9 @@ +if [ -z $QPID_HOME ] ; then + echo "QPID_HOME must be set" + exit +fi +CP=$QPID_HOME/lib/qpid-incubating.jar:../target/classes + +if [ `uname -o` == "Cygwin" ] ; then + CP=`cygpath --path --windows $CP` +fi diff --git a/java/perftests/bin/topicListener.sh b/java/perftests/bin/topicListener.sh new file mode 100755 index 0000000000..454efefe7d --- /dev/null +++ b/java/perftests/bin/topicListener.sh @@ -0,0 +1,25 @@ +#!/bin/bash +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + + +# XXX -Xmx512m -Xms512m -XX:NewSize=150m +. ./setupclasspath.sh +echo $CP +$JAVA_HOME/bin/java -cp $CP -Damqj.logging.level="INFO" org.apache.qpid.topic.Listener $* diff --git a/java/perftests/bin/topicPublisher.sh b/java/perftests/bin/topicPublisher.sh new file mode 100755 index 0000000000..cc3a8736cc --- /dev/null +++ b/java/perftests/bin/topicPublisher.sh @@ -0,0 +1,23 @@ +#!/bin/bash +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# XXX -Xmx512m -Xms512m -XX:NewSize=150m +. ./setupclasspath.sh +$JAVA_HOME/bin/java -cp $CP -Damqj.logging.level="INFO" org.apache.qpid.topic.Publisher $* diff --git a/java/perftests/pom.xml b/java/perftests/pom.xml new file mode 100644 index 0000000000..3af906c4ac --- /dev/null +++ b/java/perftests/pom.xml @@ -0,0 +1,55 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + --> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + <modelVersion>4.0.0</modelVersion> + <groupId>org.apache.qpid</groupId> + <artifactId>qpid-perftests</artifactId> + <packaging>jar</packaging> + <version>1.0-incubating-M2-SNAPSHOT</version> + <name>Qpid Performance Tests</name> + <url>http://cwiki.apache.org/confluence/display/qpid</url> + + <parent> + <groupId>org.apache.qpid</groupId> + <artifactId>qpid</artifactId> + <version>1.0-incubating-M2-SNAPSHOT</version> + </parent> + + <properties> + <topDirectoryLocation>..</topDirectoryLocation> + </properties> + + <dependencies> + <dependency> + <groupId>org.apache.qpid</groupId> + <artifactId>qpid-client</artifactId> + </dependency> + </dependencies> + +<!-- <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-antrun-plugin</artifactId> + </plugin> + </plugins> + </build>--> +</project> diff --git a/java/perftests/src/main/java/org/apache/qpid/config/AMQConnectionFactoryInitialiser.java b/java/perftests/src/main/java/org/apache/qpid/config/AMQConnectionFactoryInitialiser.java new file mode 100644 index 0000000000..cac0064785 --- /dev/null +++ b/java/perftests/src/main/java/org/apache/qpid/config/AMQConnectionFactoryInitialiser.java @@ -0,0 +1,35 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.config; + +import org.apache.qpid.client.AMQConnectionFactory; +import org.apache.qpid.config.ConnectionFactoryInitialiser; +import org.apache.qpid.config.ConnectorConfig; + +import javax.jms.ConnectionFactory; + +class AMQConnectionFactoryInitialiser implements ConnectionFactoryInitialiser +{ + public ConnectionFactory getFactory(ConnectorConfig config) + { + return new AMQConnectionFactory(config.getHost(), config.getPort(), "/test_path"); + } +} diff --git a/java/perftests/src/main/java/org/apache/qpid/config/AbstractConfig.java b/java/perftests/src/main/java/org/apache/qpid/config/AbstractConfig.java new file mode 100644 index 0000000000..04381d66a0 --- /dev/null +++ b/java/perftests/src/main/java/org/apache/qpid/config/AbstractConfig.java @@ -0,0 +1,69 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.config; + +public abstract class AbstractConfig +{ + public boolean setOptions(String[] argv) + { + try + { + for(int i = 0; i < argv.length - 1; i += 2) + { + String key = argv[i]; + String value = argv[i+1]; + setOption(key, value); + } + return true; + } + catch(Exception e) + { + System.out.println(e.getMessage()); + } + return false; + } + + protected int parseInt(String msg, String i) + { + try + { + return Integer.parseInt(i); + } + catch(NumberFormatException e) + { + throw new RuntimeException(msg + ": " + i); + } + } + + protected long parseLong(String msg, String i) + { + try + { + return Long.parseLong(i); + } + catch(NumberFormatException e) + { + throw new RuntimeException(msg + ": " + i); + } + } + + public abstract void setOption(String key, String value); +} diff --git a/java/perftests/src/main/java/org/apache/qpid/config/ConnectionFactoryInitialiser.java b/java/perftests/src/main/java/org/apache/qpid/config/ConnectionFactoryInitialiser.java new file mode 100644 index 0000000000..a9984eb09a --- /dev/null +++ b/java/perftests/src/main/java/org/apache/qpid/config/ConnectionFactoryInitialiser.java @@ -0,0 +1,29 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.config; + +import javax.jms.ConnectionFactory; +import javax.jms.JMSException; + +public interface ConnectionFactoryInitialiser +{ + public ConnectionFactory getFactory(ConnectorConfig config) throws JMSException; +} diff --git a/java/perftests/src/main/java/org/apache/qpid/config/Connector.java b/java/perftests/src/main/java/org/apache/qpid/config/Connector.java new file mode 100644 index 0000000000..ff2377f087 --- /dev/null +++ b/java/perftests/src/main/java/org/apache/qpid/config/Connector.java @@ -0,0 +1,40 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.config; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; + +public class Connector +{ + public Connection createConnection(ConnectorConfig config) throws Exception + { + return getConnectionFactory(config).createConnection(); + } + + ConnectionFactory getConnectionFactory(ConnectorConfig config) throws Exception + { + String factory = config.getFactory(); + if(factory == null) factory = AMQConnectionFactoryInitialiser.class.getName(); + System.out.println("Using " + factory); + return ((ConnectionFactoryInitialiser) Class.forName(factory).newInstance()).getFactory(config); + } +} diff --git a/java/perftests/src/main/java/org/apache/qpid/config/ConnectorConfig.java b/java/perftests/src/main/java/org/apache/qpid/config/ConnectorConfig.java new file mode 100644 index 0000000000..b120ed3f12 --- /dev/null +++ b/java/perftests/src/main/java/org/apache/qpid/config/ConnectorConfig.java @@ -0,0 +1,28 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.config; + +public interface ConnectorConfig +{ + public String getHost(); + public int getPort(); + public String getFactory(); +} diff --git a/java/perftests/src/main/java/org/apache/qpid/config/JBossConnectionFactoryInitialiser.java b/java/perftests/src/main/java/org/apache/qpid/config/JBossConnectionFactoryInitialiser.java new file mode 100644 index 0000000000..44285efd96 --- /dev/null +++ b/java/perftests/src/main/java/org/apache/qpid/config/JBossConnectionFactoryInitialiser.java @@ -0,0 +1,111 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.config; + +import org.apache.qpid.config.ConnectionFactoryInitialiser; +import org.apache.qpid.config.ConnectorConfig; + +import javax.jms.ConnectionFactory; +import javax.jms.JMSException; +import javax.management.MBeanServerConnection; +import javax.management.ObjectName; +import javax.management.MBeanException; +import javax.naming.InitialContext; +import javax.naming.NamingException; +import javax.naming.NameNotFoundException; +import java.util.Hashtable; + +public class JBossConnectionFactoryInitialiser implements ConnectionFactoryInitialiser +{ + public ConnectionFactory getFactory(ConnectorConfig config) throws JMSException + { + ConnectionFactory cf = null; + InitialContext ic = null; + Hashtable ht = new Hashtable(); + ht.put(InitialContext.INITIAL_CONTEXT_FACTORY, "org.jnp.interfaces.NamingContextFactory"); + String jbossHost = System.getProperty("jboss.host", "eqd-lxamq01"); + String jbossPort = System.getProperty("jboss.port", "1099"); + ht.put(InitialContext.PROVIDER_URL, "jnp://" + jbossHost + ":" + jbossPort); + ht.put(InitialContext.URL_PKG_PREFIXES, "org.jboss.naming:org.jnp.interfaces"); + + try + { + ic = new InitialContext(ht); + if (!doesDestinationExist("topictest.messages", ic)) + { + deployTopic("topictest.messages", ic); + } + if (!doesDestinationExist("topictest.control", ic)) + { + deployTopic("topictest.control", ic); + } + + cf = (ConnectionFactory) ic.lookup("/ConnectionFactory"); + return cf; + } + catch (NamingException e) + { + throw new JMSException("Unable to lookup object: " + e); + } + catch (Exception e) + { + throw new JMSException("Error creating topic: " + e); + } + } + + private boolean doesDestinationExist(String name, InitialContext ic) throws Exception + { + try + { + ic.lookup("/" + name); + } + catch (NameNotFoundException e) + { + return false; + } + return true; + } + + private void deployTopic(String name, InitialContext ic) throws Exception + { + MBeanServerConnection mBeanServer = lookupMBeanServerProxy(ic); + + ObjectName serverObjectName = new ObjectName("jboss.messaging:service=ServerPeer"); + + String jndiName = "/" + name; + try + { + mBeanServer.invoke(serverObjectName, "createTopic", + new Object[]{name, jndiName}, + new String[]{"java.lang.String", "java.lang.String"}); + } + catch (MBeanException e) + { + System.err.println("Error: " + e); + System.err.println("Cause: " + e.getCause()); + } + } + + private MBeanServerConnection lookupMBeanServerProxy(InitialContext ic) throws NamingException + { + return (MBeanServerConnection) ic.lookup("jmx/invoker/RMIAdaptor"); + } +} diff --git a/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceProvidingClient.java b/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceProvidingClient.java new file mode 100644 index 0000000000..ddee643a76 --- /dev/null +++ b/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceProvidingClient.java @@ -0,0 +1,201 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.requestreply; + +import org.apache.log4j.Logger; +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQQueue; +import org.apache.qpid.jms.Session; +import org.apache.qpid.jms.ConnectionListener; +import org.apache.qpid.AMQException; +import org.apache.qpid.url.URLSyntaxException; + +import javax.jms.*; +import java.net.InetAddress; +import java.net.UnknownHostException; + +public class ServiceProvidingClient +{ + private static final Logger _logger = Logger.getLogger(ServiceProvidingClient.class); + + private MessageProducer _destinationProducer; + + private Destination _responseDest; + + private AMQConnection _connection; + + public ServiceProvidingClient(String brokerDetails, String username, String password, + String clientName, String virtualPath, String serviceName) + throws AMQException, JMSException, URLSyntaxException + { + _connection = new AMQConnection(brokerDetails, username, password, + clientName, virtualPath); + _connection.setConnectionListener(new ConnectionListener() + { + + public void bytesSent(long count) + { + } + + public void bytesReceived(long count) + { + } + + public boolean preFailover(boolean redirect) + { + return true; + } + + public boolean preResubscribe() + { + return true; + } + + public void failoverComplete() + { + _logger.info("App got failover complete callback"); + } + }); + final Session session = (Session) _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + _logger.info("Service (queue) name is '" + serviceName + "'..."); + + AMQQueue destination = new AMQQueue(serviceName); + + MessageConsumer consumer = session.createConsumer(destination, + 100, true, false, null); + + consumer.setMessageListener(new MessageListener() + { + private int _messageCount; + + public void onMessage(Message message) + { + //_logger.info("Got message '" + message + "'"); + + TextMessage tm = (TextMessage) message; + + try + { + Destination responseDest = tm.getJMSReplyTo(); + if (responseDest == null) + { + _logger.info("Producer not created because the response destination is null."); + return; + } + + if (!responseDest.equals(_responseDest)) + { + _responseDest = responseDest; + + _logger.info("About to create a producer"); + _destinationProducer = session.createProducer(responseDest); + _destinationProducer.setDisableMessageTimestamp(true); + _destinationProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); + _logger.info("After create a producer"); + } + } + catch (JMSException e) + { + _logger.error("Error creating destination"); + } + _messageCount++; + if (_messageCount % 1000 == 0) + { + _logger.info("Received message total: " + _messageCount); + _logger.info("Sending response to '" + _responseDest + "'"); + } + + try + { + String payload = "This is a response: sing together: 'Mahnah mahnah...'" + tm.getText(); + TextMessage msg = session.createTextMessage(payload); + if (tm.propertyExists("timeSent")) + { + _logger.info("timeSent property set on message"); + _logger.info("timeSent value is: " + tm.getLongProperty("timeSent")); + msg.setStringProperty("timeSent", tm.getStringProperty("timeSent")); + } + _destinationProducer.send(msg); + if (_messageCount % 1000 == 0) + { + _logger.info("Sent response to '" + _responseDest + "'"); + } + } + catch (JMSException e) + { + _logger.error("Error sending message: " + e, e); + } + } + }); + } + + public void run() throws JMSException + { + _connection.start(); + _logger.info("Waiting..."); + } + + public static void main(String[] args) + { + _logger.info("Starting..."); + + if (args.length < 5) + { + System.out.println("Usage: brokerDetails username password virtual-path serviceQueue [selector]"); + System.exit(1); + } + String clientId = null; + try + { + InetAddress address = InetAddress.getLocalHost(); + clientId = address.getHostName() + System.currentTimeMillis(); + } + catch (UnknownHostException e) + { + _logger.error("Error: " + e, e); + } + + try + { + ServiceProvidingClient client = new ServiceProvidingClient(args[0], args[1], args[2], + clientId, args[3], args[4]); + client.run(); + } + catch (JMSException e) + { + _logger.error("Error: " + e, e); + } + catch (AMQException e) + { + _logger.error("Error: " + e, e); + } + catch (URLSyntaxException e) + { + _logger.error("Error: " + e, e); + } + + + + } + +} + diff --git a/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceRequestingClient.java b/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceRequestingClient.java new file mode 100644 index 0000000000..b52d06558a --- /dev/null +++ b/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceRequestingClient.java @@ -0,0 +1,303 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.requestreply; + +import org.apache.log4j.Logger; +import org.apache.qpid.AMQException; +import org.apache.qpid.url.URLSyntaxException; +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQQueue; +import org.apache.qpid.client.AMQDestination; +import org.apache.qpid.jms.MessageConsumer; +import org.apache.qpid.jms.MessageProducer; +import org.apache.qpid.jms.Session; + +import javax.jms.*; +import java.net.InetAddress; +import java.net.UnknownHostException; + +/** + * A client that behaves as follows: + * <ul><li>Connects to a queue, whose name is specified as a cmd-line argument</li> + * <li>Creates a temporary queue</li> + * <li>Creates messages containing a property that is the name of the temporary queue</li> + * <li>Fires off a message on the original queue and waits for a response on the temporary queue</li> + * </ul> + * + */ +public class ServiceRequestingClient implements ExceptionListener +{ + private static final Logger _log = Logger.getLogger(ServiceRequestingClient.class); + + private static final String MESSAGE_DATA_BYTES = "jfd ghljgl hjvhlj cvhvjf ldhfsj lhfdsjf hldsjfk hdslkfj hsdflk "; + + private String MESSAGE_DATA; + + private AMQConnection _connection; + + private Session _session; + + private long _averageLatency; + + private int _messageCount; + + private volatile boolean _completed; + + private AMQDestination _tempDestination; + + private MessageProducer _producer; + + private Object _waiter; + + private static String createMessagePayload(int size) + { + _log.info("Message size set to " + size + " bytes"); + StringBuffer buf = new StringBuffer(size); + int count = 0; + while (count < size + MESSAGE_DATA_BYTES.length()) + { + buf.append(MESSAGE_DATA_BYTES); + count += MESSAGE_DATA_BYTES.length(); + } + if (count < size) + { + buf.append(MESSAGE_DATA_BYTES, 0, size - count); + } + + return buf.toString(); + } + + private class CallbackHandler implements MessageListener + { + private int _expectedMessageCount; + + private int _actualMessageCount; + + private long _startTime; + + public CallbackHandler(int expectedMessageCount, long startTime) + { + _expectedMessageCount = expectedMessageCount; + _startTime = startTime; + } + + public void onMessage(Message m) + { + if (_log.isDebugEnabled()) + { + _log.debug("Message received: " + m); + } + try + { + m.getPropertyNames(); + if (m.propertyExists("timeSent")) + { + long timeSent = Long.parseLong(m.getStringProperty("timeSent")); + long now = System.currentTimeMillis(); + if (_averageLatency == 0) + { + _averageLatency = now - timeSent; + _log.info("Latency " + _averageLatency); + } + else + { + _log.info("Individual latency: " + (now - timeSent)); + _averageLatency = (_averageLatency + (now - timeSent)) / 2; + _log.info("Average latency now: " + _averageLatency); + } + } + } + catch (JMSException e) + { + _log.error("Error getting latency data: " + e, e); + } + _actualMessageCount++; + if (_actualMessageCount % 1000 == 0) + { + _log.info("Received message count: " + _actualMessageCount); + } + + if (_actualMessageCount == _expectedMessageCount) + { + _completed = true; + notifyWaiter(); + long timeTaken = System.currentTimeMillis() - _startTime; + _log.info("Total time taken to receive " + _expectedMessageCount + " messages was " + + timeTaken + "ms, equivalent to " + + (_expectedMessageCount / (timeTaken / 1000.0)) + " messages per second"); + + try + { + _connection.close(); + _log.info("Connection closed"); + } + catch (JMSException e) + { + _log.error("Error closing connection"); + } + } + } + } + + private void notifyWaiter() + { + if (_waiter != null) + { + synchronized (_waiter) + { + _waiter.notify(); + } + } + } + public ServiceRequestingClient(String brokerHosts, String clientID, String username, String password, + String vpath, String commandQueueName, + final int messageCount, final int messageDataLength) throws AMQException, URLSyntaxException + { + _messageCount = messageCount; + MESSAGE_DATA = createMessagePayload(messageDataLength); + try + { + createConnection(brokerHosts, clientID, username, password, vpath); + _session = (Session) _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + + _connection.setExceptionListener(this); + + + AMQQueue destination = new AMQQueue(commandQueueName); + _producer = (MessageProducer) _session.createProducer(destination); + _producer.setDisableMessageTimestamp(true); + _producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); + + _tempDestination = new AMQQueue("TempResponse" + + Long.toString(System.currentTimeMillis()), true); + MessageConsumer messageConsumer = (MessageConsumer) _session.createConsumer(_tempDestination, 100, true, + true, null); + + //Send first message, then wait a bit to allow the provider to get initialised + TextMessage first = _session.createTextMessage(MESSAGE_DATA); + first.setJMSReplyTo(_tempDestination); + _producer.send(first); + try + { + Thread.sleep(1000); + } + catch (InterruptedException ignore) + { + } + + //now start the clock and the test... + final long startTime = System.currentTimeMillis(); + + messageConsumer.setMessageListener(new CallbackHandler(messageCount, startTime)); + } + catch (JMSException e) + { + e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + } + } + + /** + * Run the test and notify an object upon receipt of all responses. + * @param waiter the object that will be notified + * @throws JMSException + */ + public void run(Object waiter) throws JMSException + { + _waiter = waiter; + _connection.start(); + for (int i = 1; i < _messageCount; i++) + { + TextMessage msg = _session.createTextMessage(MESSAGE_DATA + i); + msg.setJMSReplyTo(_tempDestination); + if (i % 1000 == 0) + { + long timeNow = System.currentTimeMillis(); + msg.setStringProperty("timeSent", String.valueOf(timeNow)); + } + _producer.send(msg); + } + _log.info("Finished sending " + _messageCount + " messages"); + } + + public boolean isCompleted() + { + return _completed; + } + + private void createConnection(String brokerHosts, String clientID, String username, String password, + String vpath) throws AMQException, URLSyntaxException + { + _connection = new AMQConnection(brokerHosts, username, password, + clientID, vpath); + } + + /** + * @param args argument 1 if present specifies the name of the temporary queue to create. Leaving it blank + * means the server will allocate a name. + */ + public static void main(String[] args) + { + if (args.length < 6) + { + System.err.println( + "Usage: ServiceRequestingClient <brokerDetails - semicolon separated host:port list> <username> <password> <vpath> <command queue name> <number of messages> <message size>"); + } + try + { + int messageDataLength = args.length > 6 ? Integer.parseInt(args[6]) : 4096; + + InetAddress address = InetAddress.getLocalHost(); + String clientID = address.getHostName() + System.currentTimeMillis(); + ServiceRequestingClient client = new ServiceRequestingClient(args[0], clientID, args[1], args[2], args[3], + args[4], Integer.parseInt(args[5]), + messageDataLength); + Object waiter = new Object(); + client.run(waiter); + synchronized (waiter) + { + while (!client.isCompleted()) + { + waiter.wait(); + } + } + + } + catch (UnknownHostException e) + { + e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + } + catch (Exception e) + { + System.err.println("Error in client: " + e); + e.printStackTrace(); + } + } + + /** + * @see javax.jms.ExceptionListener#onException(javax.jms.JMSException) + */ + public void onException(JMSException e) + { + System.err.println(e.getMessage()); + e.printStackTrace(System.err); + } +} diff --git a/java/perftests/src/main/java/org/apache/qpid/topic/Config.java b/java/perftests/src/main/java/org/apache/qpid/topic/Config.java new file mode 100644 index 0000000000..bb740f9094 --- /dev/null +++ b/java/perftests/src/main/java/org/apache/qpid/topic/Config.java @@ -0,0 +1,243 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.topic; + +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.config.ConnectorConfig; +import org.apache.qpid.config.ConnectionFactoryInitialiser; +import org.apache.qpid.config.Connector; +import org.apache.qpid.config.AbstractConfig; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; + +class Config extends AbstractConfig implements ConnectorConfig +{ + + private String host = "localhost"; + private int port = 5672; + private String factory = null; + + private int payload = 256; + private int messages = 1000; + private int clients = 1; + private int batch = 1; + private long delay = 1; + private int warmup; + private int ackMode= AMQSession.NO_ACKNOWLEDGE; + private String clientId; + private String subscriptionId; + private boolean persistent; + + public Config() + { + } + + int getAckMode() + { + return ackMode; + } + + void setPayload(int payload) + { + this.payload = payload; + } + + int getPayload() + { + return payload; + } + + void setClients(int clients) + { + this.clients = clients; + } + + int getClients() + { + return clients; + } + + void setMessages(int messages) + { + this.messages = messages; + } + + int getMessages() + { + return messages; + } + + public String getHost() + { + return host; + } + + public void setHost(String host) + { + this.host = host; + } + + public int getPort() + { + return port; + } + + public String getFactory() + { + return factory; + } + + public void setPort(int port) + { + this.port = port; + } + + int getBatch() + { + return batch; + } + + void setBatch(int batch) + { + this.batch = batch; + } + + int getWarmup() + { + return warmup; + } + + void setWarmup(int warmup) + { + this.warmup = warmup; + } + + public long getDelay() + { + return delay; + } + + public void setDelay(long delay) + { + this.delay = delay; + } + + String getClientId() + { + return clientId; + } + + String getSubscriptionId() + { + return subscriptionId; + } + + boolean usePersistentMessages() + { + return persistent; + } + + public void setOption(String key, String value) + { + if("-host".equalsIgnoreCase(key)) + { + setHost(value); + } + else if("-port".equalsIgnoreCase(key)) + { + try + { + setPort(Integer.parseInt(value)); + } + catch(NumberFormatException e) + { + throw new RuntimeException("Bad port number: " + value); + } + } + else if("-payload".equalsIgnoreCase(key)) + { + setPayload(parseInt("Bad payload size", value)); + } + else if("-messages".equalsIgnoreCase(key)) + { + setMessages(parseInt("Bad message count", value)); + } + else if("-clients".equalsIgnoreCase(key)) + { + setClients(parseInt("Bad client count", value)); + } + else if("-batch".equalsIgnoreCase(key)) + { + setBatch(parseInt("Bad batch count", value)); + } + else if("-delay".equalsIgnoreCase(key)) + { + setDelay(parseLong("Bad batch delay", value)); + } + else if("-warmup".equalsIgnoreCase(key)) + { + setWarmup(parseInt("Bad warmup count", value)); + } + else if("-ack".equalsIgnoreCase(key)) + { + ackMode = parseInt("Bad ack mode", value); + } + else if("-factory".equalsIgnoreCase(key)) + { + factory = value; + } + else if("-clientId".equalsIgnoreCase(key)) + { + clientId = value; + } + else if("-subscriptionId".equalsIgnoreCase(key)) + { + subscriptionId = value; + } + else if("-persistent".equalsIgnoreCase(key)) + { + persistent = "true".equalsIgnoreCase(value); + } + else + { + System.out.println("Ignoring unrecognised option: " + key); + } + } + + static String getAckModeDescription(int ackMode) + { + switch(ackMode) + { + case AMQSession.NO_ACKNOWLEDGE: return "NO_ACKNOWLEDGE"; + case AMQSession.AUTO_ACKNOWLEDGE: return "AUTO_ACKNOWLEDGE"; + case AMQSession.CLIENT_ACKNOWLEDGE: return "CLIENT_ACKNOWLEDGE"; + case AMQSession.DUPS_OK_ACKNOWLEDGE: return "DUPS_OK_ACKNOWELDGE"; + case AMQSession.PRE_ACKNOWLEDGE: return "PRE_ACKNOWLEDGE"; + } + return "AckMode=" + ackMode; + } + + public Connection createConnection() throws Exception + { + return new Connector().createConnection(this); + } +} diff --git a/java/perftests/src/main/java/org/apache/qpid/topic/Listener.java b/java/perftests/src/main/java/org/apache/qpid/topic/Listener.java new file mode 100644 index 0000000000..47c608cfe4 --- /dev/null +++ b/java/perftests/src/main/java/org/apache/qpid/topic/Listener.java @@ -0,0 +1,141 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.topic; + +import javax.jms.Connection; +import javax.jms.Message; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Session; + +public class Listener implements MessageListener +{ + private final Connection _connection; + private final MessageProducer _controller; + private final javax.jms.Session _session; + private final MessageFactory _factory; + private boolean init; + private int count; + private long start; + + Listener(Connection connection, int ackMode) throws Exception + { + this(connection, ackMode, null); + } + + Listener(Connection connection, int ackMode, String name) throws Exception + { + _connection = connection; + _session = connection.createSession(false, ackMode); + _factory = new MessageFactory(_session); + + //register for events + if(name == null) + { + _factory.createTopicConsumer().setMessageListener(this); + } + else + { + _factory.createDurableTopicConsumer(name).setMessageListener(this); + } + + _connection.start(); + + _controller = _factory.createControlPublisher(); + System.out.println("Waiting for messages " + + Config.getAckModeDescription(ackMode) + + (name == null ? "" : " (subscribed with name " + name + " and client id " + connection.getClientID() + ")") + + "..."); + + } + + private void shutdown() + { + try + { + _session.close(); + _connection.stop(); + _connection.close(); + } + catch(Exception e) + { + e.printStackTrace(System.out); + } + } + + private void report() + { + try + { + String msg = getReport(); + _controller.send(_factory.createReportResponseMessage(msg)); + System.out.println("Sent report: " + msg); + } + catch(Exception e) + { + e.printStackTrace(System.out); + } + } + + private String getReport() + { + long time = (System.currentTimeMillis() - start); + return "Received " + count + " in " + time + "ms"; + } + + public void onMessage(Message message) + { + if(!init) + { + start = System.currentTimeMillis(); + count = 0; + init = true; + } + + if(_factory.isShutdown(message)) + { + shutdown(); + } + else if(_factory.isReport(message)) + { + //send a report: + report(); + init = false; + } + else if (++count % 100 == 0) + { + System.out.println("Received " + count + " messages."); + } + } + + public static void main(String[] argv) throws Exception + { + Config config = new Config(); + config.setOptions(argv); + + Connection con = config.createConnection(); + if(config.getClientId() != null) + { + con.setClientID(config.getClientId()); + } + new Listener(con, config.getAckMode(), config.getSubscriptionId()); + } +} diff --git a/java/perftests/src/main/java/org/apache/qpid/topic/MessageFactory.java b/java/perftests/src/main/java/org/apache/qpid/topic/MessageFactory.java new file mode 100644 index 0000000000..1520f18408 --- /dev/null +++ b/java/perftests/src/main/java/org/apache/qpid/topic/MessageFactory.java @@ -0,0 +1,153 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.topic; + +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.client.AMQTopic; + +import javax.jms.*; + +/** + */ +class MessageFactory +{ + private static final char[] DATA = "abcdefghijklmnopqrstuvwxyz".toCharArray(); + + private final Session _session; + private final Topic _topic; + private final Topic _control; + private final byte[] _payload; + + + MessageFactory(Session session) throws JMSException + { + this(session, 256); + } + + MessageFactory(Session session, int size) throws JMSException + { + _session = session; + if(session instanceof AMQSession) + { + _topic = new AMQTopic("topictest.messages"); + _control = new AMQTopic("topictest.control"); + } + else + { + _topic = session.createTopic("topictest.messages"); + _control = session.createTopic("topictest.control"); + } + _payload = new byte[size]; + + for(int i = 0; i < size; i++) + { + _payload[i] = (byte) DATA[i % DATA.length]; + } + } + + Topic getTopic() + { + return _topic; + } + + Message createEventMessage() throws JMSException + { + BytesMessage msg = _session.createBytesMessage(); + msg.writeBytes(_payload); + return msg; + } + + Message createShutdownMessage() throws JMSException + { + return _session.createTextMessage("SHUTDOWN"); + } + + Message createReportRequestMessage() throws JMSException + { + return _session.createTextMessage("REPORT"); + } + + Message createReportResponseMessage(String msg) throws JMSException + { + return _session.createTextMessage(msg); + } + + boolean isShutdown(Message m) + { + return checkText(m, "SHUTDOWN"); + } + + boolean isReport(Message m) + { + return checkText(m, "REPORT"); + } + + Object getReport(Message m) + { + try + { + return ((TextMessage) m).getText(); + } + catch (JMSException e) + { + e.printStackTrace(System.out); + return e.toString(); + } + } + + MessageConsumer createTopicConsumer() throws Exception + { + return _session.createConsumer(_topic); + } + + MessageConsumer createDurableTopicConsumer(String name) throws Exception + { + return _session.createDurableSubscriber(_topic, name); + } + + MessageConsumer createControlConsumer() throws Exception + { + return _session.createConsumer(_control); + } + + MessageProducer createTopicPublisher() throws Exception + { + return _session.createProducer(_topic); + } + + MessageProducer createControlPublisher() throws Exception + { + return _session.createProducer(_control); + } + + private static boolean checkText(Message m, String s) + { + try + { + return m instanceof TextMessage && ((TextMessage) m).getText().equals(s); + } + catch (JMSException e) + { + e.printStackTrace(System.out); + return false; + } + } +} diff --git a/java/perftests/src/main/java/org/apache/qpid/topic/Publisher.java b/java/perftests/src/main/java/org/apache/qpid/topic/Publisher.java new file mode 100644 index 0000000000..d788029ee9 --- /dev/null +++ b/java/perftests/src/main/java/org/apache/qpid/topic/Publisher.java @@ -0,0 +1,175 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.topic; + +import javax.jms.*; + +public class Publisher implements MessageListener +{ + private final Object _lock = new Object(); + private final Connection _connection; + private final Session _session; + private final MessageFactory _factory; + private final MessageProducer _publisher; + private int _count; + + Publisher(Connection connection, int size, int ackMode, boolean persistent) throws Exception + { + _connection = connection; + _session = _connection.createSession(false, ackMode); + _factory = new MessageFactory(_session, size); + _publisher = _factory.createTopicPublisher(); + _publisher.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT); + System.out.println("Publishing " + (persistent ? "persistent" : "non-persistent") + " messages of " + size + " bytes, " + Config.getAckModeDescription(ackMode) + "."); + } + + private void test(Config config) throws Exception + { + test(config.getBatch(), config.getDelay(), config.getMessages(), config.getClients(), config.getWarmup()); + } + + private void test(int batches, long delay, int msgCount, int consumerCount, int warmup) throws Exception + { + _factory.createControlConsumer().setMessageListener(this); + _connection.start(); + + if(warmup > 0) + { + System.out.println("Runing warmup (" + warmup + " msgs)"); + long time = batch(warmup, consumerCount); + System.out.println("Warmup completed in " + time + "ms"); + } + + long[] times = new long[batches]; + for(int i = 0; i < batches; i++) + { + if(i > 0) Thread.sleep(delay*1000); + times[i] = batch(msgCount, consumerCount); + System.out.println("Batch " + (i+1) + " of " + batches + " completed in " + times[i] + " ms."); + } + + long min = min(times); + long max = max(times); + System.out.println("min: " + min + ", max: " + max + " avg: " + avg(times, min, max)); + + //request shutdown + _publisher.send(_factory.createShutdownMessage()); + + _connection.stop(); + _connection.close(); + } + + private long batch(int msgCount, int consumerCount) throws Exception + { + _count = consumerCount; + long start = System.currentTimeMillis(); + publish(msgCount); + waitForCompletion(consumerCount); + return System.currentTimeMillis() - start; + } + + private void publish(int count) throws Exception + { + + //send events + for (int i = 0; i < count; i++) + { + _publisher.send(_factory.createEventMessage()); + if ((i + 1) % 100 == 0) + { + System.out.println("Sent " + (i + 1) + " messages"); + } + } + + //request report + _publisher.send(_factory.createReportRequestMessage()); + } + + private void waitForCompletion(int consumers) throws Exception + { + System.out.println("Waiting for completion..."); + synchronized (_lock) + { + while (_count > 0) + { + _lock.wait(); + } + } + } + + + public void onMessage(Message message) + { + System.out.println("Received report " + _factory.getReport(message) + " " + --_count + " remaining"); + if (_count == 0) + { + synchronized (_lock) + { + _lock.notify(); + } + } + } + + static long min(long[] times) + { + long min = times.length > 0 ? times[0] : 0; + for(int i = 0; i < times.length; i++) + { + min = Math.min(min, times[i]); + } + return min; + } + + static long max(long[] times) + { + long max = times.length > 0 ? times[0] : 0; + for(int i = 0; i < times.length; i++) + { + max = Math.max(max, times[i]); + } + return max; + } + + static long avg(long[] times, long min, long max) + { + long sum = 0; + for(int i = 0; i < times.length; i++) + { + sum += times[i]; + } + sum -= min; + sum -= max; + + return (sum / (times.length - 2)); + } + + public static void main(String[] argv) throws Exception + { + Config config = new Config(); + config.setOptions(argv); + + Connection con = config.createConnection(); + int size = config.getPayload(); + int ackMode = config.getAckMode(); + boolean persistent = config.usePersistentMessages(); + new Publisher(con, size, ackMode, persistent).test(config); + } +} diff --git a/java/pom.xml b/java/pom.xml index dd63bbb100..0f1f016a54 100644 --- a/java/pom.xml +++ b/java/pom.xml @@ -40,6 +40,14 @@ <maven>2.0.4</maven> </prerequisites> + <distributionManagement> + <snapshotRepository> + <id>apache.snapshots</id> + <name>Apache SNAPSHOT Repository</name> + <url>scp://people.apache.org/www/people.apache.org/repo/m2-snapshot-repository</url> + </snapshotRepository> + </distributionManagement> + <inceptionYear>2006</inceptionYear> <mailingLists> <mailingList> diff --git a/java/systests/src/test/java/org/apache/qpid/server/AMQBrokerManagerMBeanTest.java b/java/systests/src/test/java/org/apache/qpid/server/AMQBrokerManagerMBeanTest.java new file mode 100644 index 0000000000..21ad1b6a7f --- /dev/null +++ b/java/systests/src/test/java/org/apache/qpid/server/AMQBrokerManagerMBeanTest.java @@ -0,0 +1,82 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.server; + +import junit.framework.TestCase; +import org.apache.qpid.server.exchange.ExchangeRegistry; +import org.apache.qpid.server.management.ManagedBroker; +import org.apache.qpid.server.queue.QueueRegistry; +import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.server.registry.IApplicationRegistry; + +public class AMQBrokerManagerMBeanTest extends TestCase +{ + private QueueRegistry _queueRegistry; + private ExchangeRegistry _exchangeRegistry; + + public void testExchangeOperations() throws Exception + { + String exchange1 = "testExchange1_" + System.currentTimeMillis(); + String exchange2 = "testExchange2_" + System.currentTimeMillis(); + String exchange3 = "testExchange3_" + System.currentTimeMillis(); + + assertTrue(_exchangeRegistry.getExchange(exchange1) == null); + assertTrue(_exchangeRegistry.getExchange(exchange2) == null); + assertTrue(_exchangeRegistry.getExchange(exchange3) == null); + + ManagedBroker mbean = new AMQBrokerManagerMBean(); + mbean.createNewExchange(exchange1,"direct",false, false); + mbean.createNewExchange(exchange2,"topic",false, false); + mbean.createNewExchange(exchange3,"headers",false, false); + + assertTrue(_exchangeRegistry.getExchange(exchange1) != null); + assertTrue(_exchangeRegistry.getExchange(exchange2) != null); + assertTrue(_exchangeRegistry.getExchange(exchange3) != null); + + mbean.unregisterExchange(exchange1); + mbean.unregisterExchange(exchange2); + mbean.unregisterExchange(exchange3); + + assertTrue(_exchangeRegistry.getExchange(exchange1) == null); + assertTrue(_exchangeRegistry.getExchange(exchange2) == null); + assertTrue(_exchangeRegistry.getExchange(exchange3) == null); + } + + public void testQueueOperations() throws Exception + { + String queueName = "testQueue_" + System.currentTimeMillis(); + ManagedBroker mbean = new AMQBrokerManagerMBean(); + + assertTrue(_queueRegistry.getQueue(queueName) == null); + + mbean.createNewQueue(queueName, false, "test", true); + assertTrue(_queueRegistry.getQueue(queueName) != null); + + mbean.deleteQueue(queueName); + assertTrue(_queueRegistry.getQueue(queueName) == null); + } + + @Override + protected void setUp() throws Exception + { + super.setUp(); + IApplicationRegistry appRegistry = ApplicationRegistry.getInstance(); + _queueRegistry = appRegistry.getQueueRegistry(); + _exchangeRegistry = appRegistry.getExchangeRegistry(); + } +} |
