From 626c1e679439bd42d4486bb16d5dfca39f99c147 Mon Sep 17 00:00:00 2001 From: Robert Godfrey Date: Sun, 24 Aug 2014 16:17:11 +0000 Subject: QPID-6037 : [Java Client] Enhance experimental support for ADDR addressing to the 0-8/9/9-1 client git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1620147 13f79535-47bb-0310-9956-ffa450edef68 --- .../java/org/apache/qpid/server/BrokerOptions.java | 1 - .../qpid/server/exchange/AbstractExchange.java | 7 + .../java/org/apache/qpid/server/model/Broker.java | 2 - .../qpid/server/queue/QueueArgumentsConverter.java | 8 + .../src/main/resources/initial-config.json | 1 - .../v0_8/handler/ExchangeDeclareHandler.java | 16 +- .../java/org/apache/qpid/client/AMQConnection.java | 5 +- .../java/org/apache/qpid/client/AMQSession.java | 4 + .../org/apache/qpid/client/AMQSession_0_10.java | 1 + .../org/apache/qpid/client/AMQSession_0_8.java | 280 +++++++++++++++++++-- .../apache/qpid/client/BasicMessageConsumer.java | 51 ++-- .../qpid/client/BasicMessageConsumer_0_10.java | 41 +-- .../qpid/client/BasicMessageConsumer_0_8.java | 22 +- .../apache/qpid/client/BasicMessageProducer.java | 30 ++- .../qpid/client/BasicMessageProducer_0_10.java | 31 +-- .../qpid/client/BasicMessageProducer_0_8.java | 43 +++- .../apache/qpid/client/util/BlockingWaiter.java | 2 +- qpid/java/systests/etc/config-systests.json | 1 - .../destination/AddressBasedDestinationTest.java | 155 +++++++----- qpid/java/test-profiles/JavaPre010Excludes | 10 +- 20 files changed, 523 insertions(+), 188 deletions(-) (limited to 'qpid/java') diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/BrokerOptions.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/BrokerOptions.java index a6fae97aaa..9b3f290723 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/BrokerOptions.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/BrokerOptions.java @@ -85,7 +85,6 @@ public class BrokerOptions Map attributes = new HashMap(); attributes.put("storePath", getConfigurationStoreLocation()); - attributes.put("storeTye", getConfigurationStoreType()); attributes.put(ConfiguredObject.CONTEXT, getConfigProperties()); return attributes; } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java index 4472669f4a..af1de8b099 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java @@ -25,6 +25,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -111,6 +112,12 @@ public abstract class AbstractExchange> public AbstractExchange(Map attributes, VirtualHostImpl vhost) { super(parentsMap(vhost), attributes); + Set providedAttributeNames = new HashSet<>(attributes.keySet()); + providedAttributeNames.removeAll(getAttributeNames()); + if(!providedAttributeNames.isEmpty()) + { + throw new IllegalArgumentException("Unknown attributes provided: " + providedAttributeNames); + } _virtualHost = vhost; // check ACL try diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java index 011aaeee23..78da1227d5 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java @@ -47,8 +47,6 @@ public interface Broker> extends ConfiguredObject, EventL String DEFAULT_VIRTUAL_HOST = "defaultVirtualHost"; String STATISTICS_REPORTING_PERIOD = "statisticsReportingPeriod"; String STATISTICS_REPORTING_RESET_ENABLED = "statisticsReportingResetEnabled"; - String STORE_TYPE = "storeType"; - String STORE_VERSION = "storeVersion"; String STORE_PATH = "storePath"; String MODEL_VERSION = "modelVersion"; String CONFIDENTIAL_CONFIGURATION_ENCRYPTION_PROVIDER = "confidentialConfigurationEncryptionProvider"; diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java index 37e82b0771..49732e8345 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java @@ -23,6 +23,8 @@ package org.apache.qpid.server.queue; import java.util.HashMap; import java.util.LinkedHashMap; import java.util.Map; + +import org.apache.qpid.server.model.ConfiguredObject; import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.virtualhost.AbstractVirtualHost; @@ -87,6 +89,8 @@ public class QueueArgumentsConverter ATTRIBUTE_MAPPINGS.put(X_QPID_PRIORITIES, PriorityQueue.PRIORITIES); ATTRIBUTE_MAPPINGS.put(X_QPID_DESCRIPTION, Queue.DESCRIPTION); + ATTRIBUTE_MAPPINGS.put(Queue.ALTERNATE_EXCHANGE, Queue.ALTERNATE_EXCHANGE); + ATTRIBUTE_MAPPINGS.put(X_QPID_DLQ_ENABLED, AbstractVirtualHost.CREATE_DLQ_ON_CREATION); ATTRIBUTE_MAPPINGS.put(QPID_GROUP_HEADER_KEY, Queue.MESSAGE_GROUP_KEY); @@ -147,6 +151,10 @@ public class QueueArgumentsConverter { value = ((Enum) value).name(); } + else if(value instanceof ConfiguredObject) + { + value = ((ConfiguredObject)value).getName(); + } wireArguments.put(entry.getKey(), value); } } diff --git a/qpid/java/broker-core/src/main/resources/initial-config.json b/qpid/java/broker-core/src/main/resources/initial-config.json index 1403b1bd12..5ee820d9ac 100644 --- a/qpid/java/broker-core/src/main/resources/initial-config.json +++ b/qpid/java/broker-core/src/main/resources/initial-config.json @@ -20,7 +20,6 @@ */ { "name": "${broker.name}", - "storeVersion": 1, "modelVersion": "2.0", "defaultVirtualHost" : "default", "authenticationproviders" : [ { diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeclareHandler.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeclareHandler.java index aaf88c81d5..3f48b413ef 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeclareHandler.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeclareHandler.java @@ -32,6 +32,7 @@ import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.AMQMethodBody; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.ExchangeDeclareBody; +import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.MethodRegistry; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.exchange.ExchangeImpl; @@ -115,15 +116,22 @@ public class ExchangeDeclareHandler implements StateAwareMethodListener attributes = new HashMap(); + Map attributes = new HashMap(); + if(body.getArguments() != null) + { + attributes.putAll(FieldTable.convertToMap(body.getArguments())); + } attributes.put(org.apache.qpid.server.model.Exchange.ID, null); attributes.put(org.apache.qpid.server.model.Exchange.NAME,name); attributes.put(org.apache.qpid.server.model.Exchange.TYPE,type); attributes.put(org.apache.qpid.server.model.Exchange.DURABLE, body.getDurable()); attributes.put(org.apache.qpid.server.model.Exchange.LIFETIME_POLICY, body.getAutoDelete() ? LifetimePolicy.DELETE_ON_NO_LINKS : LifetimePolicy.PERMANENT); - attributes.put(org.apache.qpid.server.model.Exchange.ALTERNATE_EXCHANGE, null); + if(!attributes.containsKey(org.apache.qpid.server.model.Exchange.ALTERNATE_EXCHANGE)) + { + attributes.put(org.apache.qpid.server.model.Exchange.ALTERNATE_EXCHANGE, null); + } exchange = virtualHost.createExchange(attributes); } @@ -160,6 +168,10 @@ public class ExchangeDeclareHandler implements StateAwareMethodListener { private final BasicMessageConsumer[] _fastAccessConsumers = new BasicMessageConsumer[16]; diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index 46f999e452..68b7cf1f88 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -1462,6 +1462,7 @@ public class AMQSession_0_10 extends AMQSession { @@ -175,12 +177,49 @@ public class AMQSession_0_8 extends AMQSession bindings = new ArrayList(); + bindings.addAll(destination.getNode().getBindings()); + + String defaultExchange = destination.getAddressType() == AMQDestination.TOPIC_TYPE ? + destination.getAddressName(): "amq.topic"; + + for (AMQDestination.Binding binding: bindings) + { + // Currently there is a bug (QPID-3317) with setting up and tearing down x-bindings for link. + // The null check below is a way to side step that issue while fixing QPID-4146 + // Note this issue only affects producers. + if (binding.getQueue() == null && queueName == null) + { + continue; + } + String queue = binding.getQueue() == null? + queueName.asString(): binding.getQueue(); + + String exchange = binding.getExchange() == null ? + defaultExchange : + binding.getExchange(); + + _logger.debug("Binding queue : " + queue + + " exchange: " + exchange + + " using binding key " + binding.getBindingKey() + + " with args " + Strings.printMap(binding.getArgs())); + doBind(destination, binding, queue, exchange); + } + } } public void sendClose(long timeout) throws AMQException, FailoverException @@ -547,10 +586,8 @@ public class AMQSession_0_8 extends AMQSession bindingArguments = new HashMap(); bindingArguments.put(AMQPFilterTypes.JMS_SELECTOR.getValue().toString(), messageSelector == null ? "" : messageSelector); - bindQueue(AMQShortString.valueOf(queueName), - AMQShortString.valueOf(dest.getSubject()), - FieldTable.convertToFieldTable(bindingArguments), - AMQShortString.valueOf(dest.getAddressName()),dest,false); + final AMQDestination.Binding binding = new AMQDestination.Binding(dest.getAddressName(), queueName, dest.getSubject(), bindingArguments); + doBind(dest, binding, queueName, dest.getAddressName()); } @@ -589,6 +626,15 @@ public class AMQSession_0_8 extends AMQSession( new FailoverProtectedOperation() @@ -942,13 +1000,21 @@ public class AMQSession_0_8 extends AMQSession arguments = node.getDeclareArgs(); + + if(altExchange != null && !"".equals(altExchange)) + { + arguments.put("alternateExchange", altExchange); + } + // can't set alt. exchange declareExchange(AMQShortString.valueOf(dest.getAddressName()), AMQShortString.valueOf(node.getExchangeType()), false, node.isDurable(), node.isAutoDelete(), - FieldTable.convertToFieldTable(node.getDeclareArgs()), false); + FieldTable.convertToFieldTable(arguments), false); // If bindings are specified without a queue name and is called by the producer, // the broker will send an exception as expected. @@ -962,9 +1028,79 @@ public class AMQSession_0_8 extends AMQSession(new FailoverProtectedOperation() + { + public Object execute() throws AMQException, FailoverException + { + + + MethodRegistry methodRegistry = getProtocolHandler().getMethodRegistry(); + QueueBindBody queueBindBody = + methodRegistry.createQueueBindBody(getTicket(), + AMQShortString.valueOf(queue), + AMQShortString.valueOf(exchange), + AMQShortString.valueOf(bindingKey), + false, + FieldTable.convertToFieldTable(binding.getArgs())); + + getProtocolHandler().syncWrite(queueBindBody. + generateFrame(getChannelId()), QueueBindOkBody.class); + return null; + } + }, getAMQConnection()).execute(); + + } + + + protected void doUnbind(final AMQDestination.Binding binding, + final String queue, + final String exchange) throws AMQException + { + new FailoverNoopSupport(new FailoverProtectedOperation() + { + public Object execute() throws AMQException, FailoverException + { + + if (isBound(null, AMQShortString.valueOf(queue), null)) + { + MethodRegistry methodRegistry = getProtocolHandler().getMethodRegistry(); + AMQMethodBody body; + if (methodRegistry instanceof MethodRegistry_0_9) + { + String bindingKey = binding.getBindingKey() == null ? queue : binding.getBindingKey(); + + MethodRegistry_0_9 methodRegistry_0_9 = (MethodRegistry_0_9) methodRegistry; + body = methodRegistry_0_9.createQueueUnbindBody(getTicket(), + AMQShortString.valueOf(queue), + AMQShortString.valueOf(exchange), + AMQShortString.valueOf(bindingKey), + null); + } + else if (methodRegistry instanceof MethodRegistry_0_91) + { + MethodRegistry_0_91 methodRegistry_0_91 = (MethodRegistry_0_91) methodRegistry; + body = methodRegistry_0_91.createQueueUnbindBody(getTicket(), + AMQShortString.valueOf(queue), + AMQShortString.valueOf(exchange), + AMQShortString.valueOf(binding.getBindingKey()), + null); + + } + else + { + throw new AMQException(AMQConstant.NOT_IMPLEMENTED, "Cannot unbind a queue in AMQP 0-8"); + } + getProtocolHandler().syncWrite(body.generateFrame(getChannelId()), QueueUnbindOkBody.class); + return null; + } + else + { + return null; + } + } + }, getAMQConnection()).execute(); } public boolean isQueueExist(AMQDestination dest, boolean assertNode) throws AMQException @@ -1057,6 +1193,102 @@ public class AMQSession_0_8 extends AMQSession(new FailoverProtectedOperation() + { + public Object execute() throws AMQException, FailoverException + { + sendExchangeDelete(dest.getAddressName()); + return null; + } + }, getAMQConnection()).execute(); + dest.setAddressResolved(0); + } + } + else + { + if (isQueueExist(dest,false)) + { + + new FailoverNoopSupport(new FailoverProtectedOperation() + { + public Object execute() throws AMQException, FailoverException + { + sendQueueDelete(AMQShortString.valueOf(dest.getAddressName())); + return null; + } + }, getAMQConnection()).execute(); + dest.setAddressResolved(0); + } + } + } + + @Override + void handleLinkDelete(AMQDestination dest) throws AMQException + { + // We need to destroy link bindings + String defaultExchangeForBinding = dest.getAddressType() == AMQDestination.TOPIC_TYPE ? dest + .getAddressName() : "amq.topic"; + + String defaultQueueName = null; + if (AMQDestination.QUEUE_TYPE == dest.getAddressType()) + { + defaultQueueName = dest.getQueueName(); + } + else + { + defaultQueueName = dest.getLink().getName() != null ? dest.getLink().getName() : dest.getQueueName(); + } + + for (AMQDestination.Binding binding: dest.getLink().getBindings()) + { + String queue = binding.getQueue() == null? + defaultQueueName: binding.getQueue(); + + String exchange = binding.getExchange() == null ? + defaultExchangeForBinding : + binding.getExchange(); + + if (_logger.isDebugEnabled()) + { + _logger.debug("Unbinding queue : " + queue + + " exchange: " + exchange + + " using binding key " + binding.getBindingKey() + + " with args " + Strings.printMap(binding.getArgs())); + } + doUnbind(binding, queue, exchange); + } + } + + + void deleteSubscriptionQueue(final AMQDestination dest) throws AMQException + { + // We need to delete the subscription queue. + if (dest.getAddressType() == AMQDestination.TOPIC_TYPE && + dest.getLink().getSubscriptionQueue().isExclusive() && + isQueueExist(dest.getQueueName(), false, false, false, false, null)) + { + (new FailoverNoopSupport( + new FailoverProtectedOperation() + { + public Void execute() throws AMQException, FailoverException + { + + sendQueueDelete(AMQShortString.valueOf(dest.getQueueName())); + return null; + } + }, getAMQConnection())).execute(); + + } + } + protected void flushAcknowledgments() { diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index 01e89b78c1..187be8522c 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -20,19 +20,35 @@ */ package org.apache.qpid.client; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import javax.jms.InvalidSelectorException; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageListener; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.qpid.AMQException; import org.apache.qpid.AMQInternalException; import org.apache.qpid.client.failover.FailoverException; +import org.apache.qpid.client.filter.JMSSelectorFilter; import org.apache.qpid.client.filter.MessageFilter; import org.apache.qpid.client.message.AMQMessageDelegateFactory; import org.apache.qpid.client.message.AbstractJMSMessage; import org.apache.qpid.client.message.CloseConsumerMessage; import org.apache.qpid.client.message.MessageFactoryRegistry; import org.apache.qpid.common.AMQPFilterTypes; -import org.apache.qpid.client.filter.JMSSelectorFilter; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.FieldTableFactory; @@ -40,21 +56,6 @@ import org.apache.qpid.jms.MessageConsumer; import org.apache.qpid.jms.Session; import org.apache.qpid.transport.TransportException; -import javax.jms.InvalidSelectorException; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageListener; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Iterator; -import java.util.List; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; - public abstract class BasicMessageConsumer extends Closeable implements MessageConsumer { private static final Logger _logger = LoggerFactory.getLogger(BasicMessageConsumer.class); @@ -376,7 +377,23 @@ public abstract class BasicMessageConsumer extends Closeable implements Messa */ public boolean isExclusive() { - return _exclusive; + + AMQDestination dest = this.getDestination(); + if (dest.getDestSyntax() == AMQDestination.DestSyntax.ADDR) + { + if (dest.getAddressType() == AMQDestination.TOPIC_TYPE) + { + return true; + } + else + { + return dest.getLink().getSubscription().isExclusive(); + } + } + else + { + return _exclusive; + } } public boolean isReceiving() diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java index 658fb25ce4..8f91a7db08 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java @@ -17,12 +17,18 @@ */ package org.apache.qpid.client; +import java.util.Iterator; +import java.util.concurrent.atomic.AtomicBoolean; + +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageListener; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.qpid.AMQException; import org.apache.qpid.client.AMQDestination.AddressOption; -import org.apache.qpid.client.failover.FailoverException; import org.apache.qpid.client.message.AMQMessageDelegateFactory; import org.apache.qpid.client.message.AMQMessageDelegate_0_10; import org.apache.qpid.client.message.AbstractJMSMessage; @@ -41,12 +47,6 @@ import org.apache.qpid.transport.RangeSetFactory; import org.apache.qpid.transport.SessionException; import org.apache.qpid.transport.TransportException; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageListener; -import java.util.Iterator; -import java.util.concurrent.atomic.AtomicBoolean; - /** * This is a 0.10 message consumer. */ @@ -480,26 +480,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer { _waiting.set(true); - while (!_ready) + while (!_ready && _error == null) { try { diff --git a/qpid/java/systests/etc/config-systests.json b/qpid/java/systests/etc/config-systests.json index 4ec402c292..3ef4fa40f8 100644 --- a/qpid/java/systests/etc/config-systests.json +++ b/qpid/java/systests/etc/config-systests.json @@ -21,7 +21,6 @@ { "name": "Broker", "defaultVirtualHost" : "test", - "storeVersion": 1, "modelVersion": "2.0", "authenticationproviders" : [ { "name" : "plain", diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java index 14cadc2389..391498194b 100644 --- a/qpid/java/systests/src/test/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java +++ b/qpid/java/systests/src/test/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java @@ -52,7 +52,6 @@ import org.apache.qpid.client.AMQAnyDestination; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQDestination; import org.apache.qpid.client.AMQSession; -import org.apache.qpid.client.AMQSession_0_10; import org.apache.qpid.client.message.QpidMessageProperties; import org.apache.qpid.jndi.PropertiesFileInitialContextFactory; import org.apache.qpid.messaging.Address; @@ -76,7 +75,14 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase @Override public void tearDown() throws Exception { - _connection.close(); + try + { + _connection.close(); + } + catch(JMSException e) + { + // ignore + } super.tearDown(); } @@ -90,14 +96,15 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase // create never -------------------------------------------- String addr1 = "ADDR:testQueue1"; AMQDestination dest = new AMQAnyDestination(addr1); + final String queueErrorMessage = "The name 'testQueue1' supplied in the address " + + "doesn't resolve to an exchange or a queue"; try { cons = jmsSession.createConsumer(dest); } catch(JMSException e) { - assertTrue(e.getMessage().contains("The name 'testQueue1' supplied in the address " + - "doesn't resolve to an exchange or a queue")); + assertTrue(e.getMessage().contains(queueErrorMessage)); } try @@ -106,12 +113,12 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase } catch(JMSException e) { - assertTrue(e.getCause().getCause().getMessage().contains("The name 'testQueue1' supplied in the address " + - "doesn't resolve to an exchange or a queue")); + assertTrue(e.getCause().getMessage().contains(queueErrorMessage) + || e.getCause().getCause().getMessage().contains(queueErrorMessage)); } assertFalse("Queue should not be created",( - (AMQSession_0_10)jmsSession).isQueueExist(dest,false)); + (AMQSession)jmsSession).isQueueExist(dest,false)); // create always ------------------------------------------- @@ -120,9 +127,9 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase cons = jmsSession.createConsumer(dest); assertTrue("Queue not created as expected",( - (AMQSession_0_10)jmsSession).isQueueExist(dest, true)); + (AMQSession)jmsSession).isQueueExist(dest, true)); assertTrue("Queue not bound as expected",( - (AMQSession_0_10)jmsSession).isQueueBound("", + (AMQSession)jmsSession).isQueueBound("", dest.getAddressName(),dest.getAddressName(), null)); // create receiver ----------------------------------------- @@ -134,33 +141,36 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase } catch(JMSException e) { - assertTrue(e.getCause().getCause().getMessage().contains("The name 'testQueue2' supplied in the address " + - "doesn't resolve to an exchange or a queue")); + String expectedMessage = "The name 'testQueue2' supplied in the address " + + "doesn't resolve to an exchange or a queue"; + assertTrue(e.getCause().getMessage().contains(expectedMessage) + || e.getCause().getCause().getMessage().contains(expectedMessage)); } assertFalse("Queue should not be created",( - (AMQSession_0_10)jmsSession).isQueueExist(dest, false)); + (AMQSession)jmsSession).isQueueExist(dest, false)); cons = jmsSession.createConsumer(dest); assertTrue("Queue not created as expected",( - (AMQSession_0_10)jmsSession).isQueueExist(dest, true)); + (AMQSession)jmsSession).isQueueExist(dest, true)); assertTrue("Queue not bound as expected",( - (AMQSession_0_10)jmsSession).isQueueBound("", + (AMQSession)jmsSession).isQueueBound("", dest.getAddressName(),dest.getAddressName(), null)); // create never -------------------------------------------- addr1 = "ADDR:testQueue3; { create: never }"; dest = new AMQAnyDestination(addr1); + String testQueue3ErrorMessage = "The name 'testQueue3' supplied in the address " + + "doesn't resolve to an exchange or a queue"; try { cons = jmsSession.createConsumer(dest); } catch(JMSException e) { - assertTrue(e.getMessage().contains("The name 'testQueue3' supplied in the address " + - "doesn't resolve to an exchange or a queue")); + assertTrue(e.getMessage().contains(testQueue3ErrorMessage)); } try @@ -169,12 +179,12 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase } catch(JMSException e) { - assertTrue(e.getCause().getCause().getMessage().contains("The name 'testQueue3' supplied in the address " + - "doesn't resolve to an exchange or a queue")); + assertTrue(e.getCause().getMessage().contains(testQueue3ErrorMessage) + || e.getCause().getCause().getMessage().contains(testQueue3ErrorMessage)); } assertFalse("Queue should not be created",( - (AMQSession_0_10)jmsSession).isQueueExist(dest, false)); + (AMQSession)jmsSession).isQueueExist(dest, false)); // create sender ------------------------------------------ addr1 = "ADDR:testQueue3; { create: sender }"; @@ -186,17 +196,16 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase } catch(JMSException e) { - assertTrue(e.getMessage().contains("The name 'testQueue3' supplied in the address " + - "doesn't resolve to an exchange or a queue")); + assertTrue(e.getMessage().contains(testQueue3ErrorMessage)); } assertFalse("Queue should not be created",( - (AMQSession_0_10)jmsSession).isQueueExist(dest, false)); + (AMQSession)jmsSession).isQueueExist(dest, false)); prod = jmsSession.createProducer(dest); assertTrue("Queue not created as expected",( - (AMQSession_0_10)jmsSession).isQueueExist(dest, true)); + (AMQSession)jmsSession).isQueueExist(dest, true)); assertTrue("Queue not bound as expected",( - (AMQSession_0_10)jmsSession).isQueueBound("", + (AMQSession)jmsSession).isQueueBound("", dest.getAddressName(),dest.getAddressName(), null)); } @@ -234,22 +243,22 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase // Even if the consumer is closed the queue and the bindings should be intact. assertTrue("Queue not created as expected",( - (AMQSession_0_10)jmsSession).isQueueExist(dest, true)); + (AMQSession)jmsSession).isQueueExist(dest, true)); assertTrue("Queue not bound as expected",( - (AMQSession_0_10)jmsSession).isQueueBound("", + (AMQSession)jmsSession).isQueueBound("", dest.getAddressName(),dest.getAddressName(), null)); assertTrue("Queue not bound as expected",( - (AMQSession_0_10)jmsSession).isQueueBound("amq.direct", + (AMQSession)jmsSession).isQueueBound("amq.direct", dest.getAddressName(),"test", null)); assertTrue("Queue not bound as expected",( - (AMQSession_0_10)jmsSession).isQueueBound("amq.fanout", + (AMQSession)jmsSession).isQueueBound("amq.fanout", dest.getAddressName(),null, null)); assertTrue("Queue not bound as expected",( - (AMQSession_0_10)jmsSession).isQueueBound("amq.topic", + (AMQSession)jmsSession).isQueueBound("amq.topic", dest.getAddressName(),"a.#", null)); Map args = new HashMap(); @@ -257,7 +266,7 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase args.put("dep","sales"); args.put("loc","CA"); assertTrue("Queue not bound as expected",( - (AMQSession_0_10)jmsSession).isQueueBound("amq.match", + (AMQSession)jmsSession).isQueueBound("amq.match", dest.getAddressName(),null, args)); MessageProducer prod = jmsSession.createProducer(dest); @@ -339,6 +348,11 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase { return; } + else if((useNonsenseExchangeType || useNonsenseArguments) && !isBroker010() + && String.valueOf(AMQConstant.COMMAND_INVALID.getCode()).equals(e.getErrorCode())) + { + return; + } else { fail("Unexpected exception whilst creating consumer: " + e); @@ -346,11 +360,11 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase } assertTrue("Exchange not created as expected",( - (AMQSession_0_10)jmsSession).isExchangeExist(dest,true)); + (AMQSession)jmsSession).isExchangeExist(dest,true)); // The existence of the queue is implicitly tested here assertTrue("Queue not bound as expected",( - (AMQSession_0_10)jmsSession).isQueueBound("my-exchange", + (AMQSession)jmsSession).isQueueBound("my-exchange", dest.getQueueName(),"hello", null)); // The client should be able to query and verify the existence of my-exchange (QPID-2774) @@ -387,23 +401,23 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase public void checkQueueForBindings(Session jmsSession, AMQDestination dest,String headersBinding) throws Exception { assertTrue("Queue not created as expected",( - (AMQSession_0_10)jmsSession).isQueueExist(dest, true)); + (AMQSession)jmsSession).isQueueExist(dest, true)); - assertTrue("Queue not bound as expected",( - (AMQSession_0_10)jmsSession).isQueueBound("", - dest.getAddressName(),dest.getAddressName(), null)); + assertTrue("Queue not bound as expected", ( + (AMQSession) jmsSession).isQueueBound("", + dest.getAddressName(), dest.getAddressName(), null)); assertTrue("Queue not bound as expected",( - (AMQSession_0_10)jmsSession).isQueueBound("amq.direct", + (AMQSession)jmsSession).isQueueBound("amq.direct", dest.getAddressName(),"test", null)); assertTrue("Queue not bound as expected",( - (AMQSession_0_10)jmsSession).isQueueBound("amq.topic", + (AMQSession)jmsSession).isQueueBound("amq.topic", dest.getAddressName(),"a.#", null)); Address a = Address.parse(headersBinding); assertTrue("Queue not bound as expected",( - (AMQSession_0_10)jmsSession).isQueueBound("amq.match", + (AMQSession)jmsSession).isQueueBound("amq.match", dest.getAddressName(),null, a.getOptions())); } @@ -526,17 +540,17 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase MessageConsumer cons3 = jmsSession.createConsumer(dest3); assertTrue("Destination1 was not created as expected",( - (AMQSession_0_10)jmsSession).isQueueExist(dest1, true)); + (AMQSession)jmsSession).isQueueExist(dest1, true)); assertTrue("Destination1 was not bound as expected",( - (AMQSession_0_10)jmsSession).isQueueBound("", + (AMQSession)jmsSession).isQueueBound("", dest1.getAddressName(),dest1.getAddressName(), null)); assertTrue("Destination2 was not created as expected",( - (AMQSession_0_10)jmsSession).isQueueExist(dest2,true)); + (AMQSession)jmsSession).isQueueExist(dest2,true)); assertTrue("Destination2 was not bound as expected",( - (AMQSession_0_10)jmsSession).isQueueBound("", + (AMQSession)jmsSession).isQueueBound("", dest2.getAddressName(),dest2.getAddressName(), null)); MessageProducer producer = jmsSession.createProducer(dest3); @@ -587,7 +601,7 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase MessageProducer prod = ssn.createProducer(queue); MessageConsumer cons = ssn.createConsumer(queue); assertTrue("my-queue was not created as expected",( - (AMQSession_0_10)ssn).isQueueBound("amq.direct", + (AMQSession)ssn).isQueueBound("amq.direct", "my-queue","my-queue", null)); prod.send(ssn.createTextMessage("test")); @@ -606,7 +620,7 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase { String s = "The name 'my-queue2' supplied in the address " + "doesn't resolve to an exchange or a queue"; - assertEquals(s,e.getCause().getCause().getMessage()); + assertTrue(s.equals(e.getCause().getMessage()) || s.equals(e.getCause().getCause().getMessage())); } // explicit create case @@ -614,7 +628,7 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase prod = ssn.createProducer(queue); cons = ssn.createConsumer(queue); assertTrue("my-queue2 was not created as expected",( - (AMQSession_0_10)ssn).isQueueBound("", + (AMQSession)ssn).isQueueBound("", "my-queue2","my-queue2", null)); prod.send(ssn.createTextMessage("test")); @@ -631,7 +645,7 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase cons = ssn.createConsumer(queue); prod = ssn.createProducer(queue); assertTrue("MY.RESP.QUEUE was not created as expected",( - (AMQSession_0_10)ssn).isQueueBound("amq.direct", + (AMQSession)ssn).isQueueBound("amq.direct", "MY.RESP.QUEUE","x512", null)); cons.close(); } @@ -701,15 +715,15 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase prod = ssn.createProducer(topic); assertTrue("The queue was not bound to vehicle exchange using bus as the binding key",( - (AMQSession_0_10)ssn).isQueueBound("vehicles", + (AMQSession)ssn).isQueueBound("vehicles", "my-topic","bus", null)); assertTrue("The queue was not bound to vehicle exchange using car as the binding key",( - (AMQSession_0_10)ssn).isQueueBound("vehicles", + (AMQSession)ssn).isQueueBound("vehicles", "my-topic","car", null)); assertTrue("The queue was not bound to vehicle exchange using van as the binding key",( - (AMQSession_0_10)ssn).isQueueBound("vehicles", + (AMQSession)ssn).isQueueBound("vehicles", "my-topic","van", null)); Message msg = ssn.createTextMessage("test"); @@ -822,15 +836,18 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase catch(Exception e) { } - _connection.close(); + } + + public void testJMSTopicIsTreatedAsQueueIn0_10() throws Exception + { _connection = getConnection() ; _connection.start(); - ssn = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE); - dest = ssn.createTopic("ADDR:my_queue; {create: always}"); - consumer1 = ssn.createConsumer(dest); - consumer2 = ssn.createConsumer(dest); - prod = ssn.createProducer(dest); + final Session ssn = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + final Destination dest = ssn.createTopic("ADDR:my_queue; {create: always}"); + final MessageConsumer consumer1 = ssn.createConsumer(dest); + final MessageConsumer consumer2 = ssn.createConsumer(dest); + final MessageProducer prod = ssn.createProducer(dest); prod.send(ssn.createTextMessage("A")); Message m1 = consumer1.receive(1000); @@ -864,15 +881,15 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase MessageConsumer cons = ssn.createConsumer(topic); assertTrue("The queue was not bound to MRKT exchange using NYSE.# as the binding key",( - (AMQSession_0_10)ssn).isQueueBound("MRKT", + (AMQSession)ssn).isQueueBound("MRKT", "my-topic","NYSE.#", null)); assertTrue("The queue was not bound to MRKT exchange using NASDAQ.# as the binding key",( - (AMQSession_0_10)ssn).isQueueBound("MRKT", + (AMQSession)ssn).isQueueBound("MRKT", "my-topic","NASDAQ.#", null)); assertTrue("The queue was not bound to MRKT exchange using CNTL.# as the binding key",( - (AMQSession_0_10)ssn).isQueueBound("MRKT", + (AMQSession)ssn).isQueueBound("MRKT", "my-topic","CNTL.#", null)); MessageProducer prod = ssn.createProducer(topic); @@ -886,7 +903,7 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase public void testXSubscribeOverrides() throws Exception { Session ssn = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE); - String str = "ADDR:my_queue; {create:always,link: {x-subscribes:{exclusive: true, arguments: {a:b,x:y}}}}"; + String str = "ADDR:my_queue; {create:always, node: { type: queue }, link: {x-subscribes:{exclusive: true, arguments: {a:b,x:y}}}}"; Destination dest = ssn.createTopic(str); MessageConsumer consumer1 = ssn.createConsumer(dest); try @@ -937,7 +954,7 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase props.setProperty("destination.address1", "ADDR:amq.topic/test"); props.setProperty("destination.address2", "ADDR:amq.topic/test; {node:{" + bindingStr); props.setProperty("destination.address3", "ADDR:amq.topic/test; {link:{" + bindingStr); - String addrStr = "ADDR:my_queue; {create:always,link: {x-subscribes:{exclusive: true, arguments: {a:b,x:y}}}}"; + String addrStr = "ADDR:my_queue; {create:always,node : {type: queue}, link: {x-subscribes:{exclusive: true, arguments: {a:b,x:y}}}}"; props.setProperty("destination.address5", addrStr); Context ctx = new InitialContext(props); @@ -1055,7 +1072,7 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase } assertFalse("Queue not deleted as expected",( - (AMQSession_0_10)jmsSession).isQueueExist(dest, false)); + (AMQSession)jmsSession).isQueueExist(dest, false)); String addr2 = "ADDR:testQueue2;{create: always, delete: receiver}"; @@ -1071,7 +1088,7 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase } assertFalse("Queue not deleted as expected",( - (AMQSession_0_10)jmsSession).isQueueExist(dest, false)); + (AMQSession)jmsSession).isQueueExist(dest, false)); String addr3 = "ADDR:testQueue3;{create: always, delete: sender}"; @@ -1088,7 +1105,7 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase } assertFalse("Queue not deleted as expected",( - (AMQSession_0_10)jmsSession).isQueueExist(dest, false)); + (AMQSession)jmsSession).isQueueExist(dest, false)); } /** @@ -1206,11 +1223,11 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase m.setJMSReplyTo(replyToDest); prod.send(m); - Message msg = cons.receive(); + Message msg = cons.receive(5000l); MessageProducer prodR = session.createProducer(msg.getJMSReplyTo()); prodR.send(session.createTextMessage("x")); - Message m1 = replyToCons.receive(); + Message m1 = replyToCons.receive(5000l); assertNotNull("The reply to consumer should have received the messsage",m1); } @@ -1422,7 +1439,7 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase AMQDestination dest = (AMQDestination)jmsSession.createQueue(addr); MessageConsumer cons = jmsSession.createConsumer(dest); - AMQSession_0_10 ssn = (AMQSession_0_10)jmsSession; + AMQSession ssn = (AMQSession)jmsSession; assertTrue("Queue not created as expected",ssn.isQueueExist(dest, true)); assertTrue("Queue not bound as expected",ssn.isQueueBound("amq.direct","my-queue","test", null)); @@ -1454,11 +1471,11 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase String verifyAddr = "ADDR:my-queue;{ node: {durable:true, " + xDeclareArgs + "}}"; AMQDestination verifyDest = (AMQDestination)ssn.createQueue(verifyAddr); - ((AMQSession_0_10)ssn).isQueueExist(verifyDest, true); + ((AMQSession)ssn).isQueueExist(verifyDest, true); // Verify that the producer does not delete the subscription queue. MessageProducer prod = ssn.createProducer(dest); prod.close(); - ((AMQSession_0_10)ssn).isQueueExist(verifyDest, true); + ((AMQSession)ssn).isQueueExist(verifyDest, true); } } diff --git a/qpid/java/test-profiles/JavaPre010Excludes b/qpid/java/test-profiles/JavaPre010Excludes index b2f96ca279..e89753bef3 100644 --- a/qpid/java/test-profiles/JavaPre010Excludes +++ b/qpid/java/test-profiles/JavaPre010Excludes @@ -25,9 +25,17 @@ org.apache.qpid.test.client.message.JMSDestinationTest#testReceiveResend org.apache.qpid.server.message.MessageProtocolConversionTest#* +//QPID-3422: test fails because ring queue is not implemented on java broker +org.apache.qpid.test.client.destination.AddressBasedDestinationTest#testBrowseMode +//QPID-3392: the Java broker does not yet implement exchange creation arguments +org.apache.qpid.test.client.destination.AddressBasedDestinationTest#testCreateExchangeWithArgs +org.apache.qpid.test.client.destination.AddressBasedDestinationTest#testSessionCreateTopicWithExchangeArgs +//QPID-6037: the 0-9-1 client takes the view that if you don't specify the node type but you ask for a JMS Topic +// you want a topic behaviour. The 0-10 client thinks you must want a queue. +org.apache.qpid.test.client.destination.AddressBasedDestinationTest#testJMSTopicIsTreatedAsQueueIn0_10 + // The new addressing based syntax is not supported for AMQP 0-8/0-9 versions org.apache.qpid.client.failover.AddressBasedFailoverBehaviourTest#* -org.apache.qpid.test.client.destination.AddressBasedDestinationTest#* org.apache.qpid.test.client.queue.QueuePolicyTest#testRingPolicy org.apache.qpid.test.client.queue.QueuePolicyTest#testRejectPolicy org.apache.qpid.test.unit.message.JMSPropertiesTest#testApplicationProperties -- cgit v1.2.1