diff options
Diffstat (limited to 'qpid/java')
20 files changed, 523 insertions, 188 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/BrokerOptions.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/BrokerOptions.java index a6fae97aaa..9b3f290723 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/BrokerOptions.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/BrokerOptions.java @@ -85,7 +85,6 @@ public class BrokerOptions Map<String,Object> attributes = new HashMap<String, Object>(); attributes.put("storePath", getConfigurationStoreLocation()); - attributes.put("storeTye", getConfigurationStoreType()); attributes.put(ConfiguredObject.CONTEXT, getConfigProperties()); return attributes; } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java index 4472669f4a..af1de8b099 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java @@ -25,6 +25,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -111,6 +112,12 @@ public abstract class AbstractExchange<T extends AbstractExchange<T>> public AbstractExchange(Map<String, Object> attributes, VirtualHostImpl vhost) { super(parentsMap(vhost), attributes); + Set<String> providedAttributeNames = new HashSet<>(attributes.keySet()); + providedAttributeNames.removeAll(getAttributeNames()); + if(!providedAttributeNames.isEmpty()) + { + throw new IllegalArgumentException("Unknown attributes provided: " + providedAttributeNames); + } _virtualHost = vhost; // check ACL try diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java index 011aaeee23..78da1227d5 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java @@ -47,8 +47,6 @@ public interface Broker<X extends Broker<X>> extends ConfiguredObject<X>, EventL String DEFAULT_VIRTUAL_HOST = "defaultVirtualHost"; String STATISTICS_REPORTING_PERIOD = "statisticsReportingPeriod"; String STATISTICS_REPORTING_RESET_ENABLED = "statisticsReportingResetEnabled"; - String STORE_TYPE = "storeType"; - String STORE_VERSION = "storeVersion"; String STORE_PATH = "storePath"; String MODEL_VERSION = "modelVersion"; String CONFIDENTIAL_CONFIGURATION_ENCRYPTION_PROVIDER = "confidentialConfigurationEncryptionProvider"; diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java index 37e82b0771..49732e8345 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java @@ -23,6 +23,8 @@ package org.apache.qpid.server.queue; import java.util.HashMap; import java.util.LinkedHashMap; import java.util.Map; + +import org.apache.qpid.server.model.ConfiguredObject; import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.virtualhost.AbstractVirtualHost; @@ -87,6 +89,8 @@ public class QueueArgumentsConverter ATTRIBUTE_MAPPINGS.put(X_QPID_PRIORITIES, PriorityQueue.PRIORITIES); ATTRIBUTE_MAPPINGS.put(X_QPID_DESCRIPTION, Queue.DESCRIPTION); + ATTRIBUTE_MAPPINGS.put(Queue.ALTERNATE_EXCHANGE, Queue.ALTERNATE_EXCHANGE); + ATTRIBUTE_MAPPINGS.put(X_QPID_DLQ_ENABLED, AbstractVirtualHost.CREATE_DLQ_ON_CREATION); ATTRIBUTE_MAPPINGS.put(QPID_GROUP_HEADER_KEY, Queue.MESSAGE_GROUP_KEY); @@ -147,6 +151,10 @@ public class QueueArgumentsConverter { value = ((Enum) value).name(); } + else if(value instanceof ConfiguredObject) + { + value = ((ConfiguredObject)value).getName(); + } wireArguments.put(entry.getKey(), value); } } diff --git a/qpid/java/broker-core/src/main/resources/initial-config.json b/qpid/java/broker-core/src/main/resources/initial-config.json index 1403b1bd12..5ee820d9ac 100644 --- a/qpid/java/broker-core/src/main/resources/initial-config.json +++ b/qpid/java/broker-core/src/main/resources/initial-config.json @@ -20,7 +20,6 @@ */ { "name": "${broker.name}", - "storeVersion": 1, "modelVersion": "2.0", "defaultVirtualHost" : "default", "authenticationproviders" : [ { diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeclareHandler.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeclareHandler.java index aaf88c81d5..3f48b413ef 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeclareHandler.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeclareHandler.java @@ -32,6 +32,7 @@ import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.AMQMethodBody; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.ExchangeDeclareBody; +import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.MethodRegistry; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.exchange.ExchangeImpl; @@ -115,15 +116,22 @@ public class ExchangeDeclareHandler implements StateAwareMethodListener<Exchange { String name = exchangeName == null ? null : exchangeName.intern().toString(); String type = body.getType() == null ? null : body.getType().intern().toString(); - Map<String,Object> attributes = new HashMap<String, Object>(); + Map<String,Object> attributes = new HashMap<String, Object>(); + if(body.getArguments() != null) + { + attributes.putAll(FieldTable.convertToMap(body.getArguments())); + } attributes.put(org.apache.qpid.server.model.Exchange.ID, null); attributes.put(org.apache.qpid.server.model.Exchange.NAME,name); attributes.put(org.apache.qpid.server.model.Exchange.TYPE,type); attributes.put(org.apache.qpid.server.model.Exchange.DURABLE, body.getDurable()); attributes.put(org.apache.qpid.server.model.Exchange.LIFETIME_POLICY, body.getAutoDelete() ? LifetimePolicy.DELETE_ON_NO_LINKS : LifetimePolicy.PERMANENT); - attributes.put(org.apache.qpid.server.model.Exchange.ALTERNATE_EXCHANGE, null); + if(!attributes.containsKey(org.apache.qpid.server.model.Exchange.ALTERNATE_EXCHANGE)) + { + attributes.put(org.apache.qpid.server.model.Exchange.ALTERNATE_EXCHANGE, null); + } exchange = virtualHost.createExchange(attributes); } @@ -160,6 +168,10 @@ public class ExchangeDeclareHandler implements StateAwareMethodListener<Exchange // note - since 0-8/9/9-1 can't set the alt. exchange this exception should never occur throw body.getConnectionException(AMQConstant.NOT_FOUND, "Unknown alternate exchange",e); } + catch (IllegalArgumentException e) + { + throw body.getConnectionException(AMQConstant.COMMAND_INVALID, "Error creating exchange",e); + } } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index b64d355f80..2a91ff3ce2 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -87,6 +87,8 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect private static final Logger _logger = LoggerFactory.getLogger(AMQConnection.class); private static final AtomicLong CONN_NUMBER_GENERATOR = new AtomicLong(); + private static final long DEFAULT_CLOSE_TIMEOUT = 2000l; + private final long _connectionNumber; /** @@ -160,7 +162,6 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect /** Thread Pool for executing connection level processes. Such as returning bounced messages. */ private final ExecutorService _taskPool = Executors.newCachedThreadPool(); - private static final long DEFAULT_TIMEOUT = 1000 * 30; private AMQConnectionDelegate _delegate; @@ -873,7 +874,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect public void close() throws JMSException { - close(DEFAULT_TIMEOUT); + close(DEFAULT_CLOSE_TIMEOUT); } public void close(long timeout) throws JMSException diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 0b299a22cd..0183c30276 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -313,6 +313,10 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic return _immediatePrefetch; } + abstract void handleNodeDelete(final AMQDestination dest) throws AMQException; + + abstract void handleLinkDelete(final AMQDestination dest) throws AMQException; + public static final class IdToConsumerMap<C extends BasicMessageConsumer> { private final BasicMessageConsumer[] _fastAccessConsumers = new BasicMessageConsumer[16]; diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index 46f999e452..68b7cf1f88 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -1462,6 +1462,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic } } + @Override void handleNodeDelete(AMQDestination dest) throws AMQException { if (AMQDestination.TOPIC_TYPE == dest.getAddressType()) diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java index e5ca82f56a..0145d15111 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java @@ -29,6 +29,7 @@ import static org.apache.qpid.configuration.ClientProperties.QPID_FLOW_CONTROL_W import java.util.ArrayList; import java.util.HashMap; import java.util.LinkedHashMap; +import java.util.List; import java.util.Map; import java.util.UUID; @@ -64,6 +65,7 @@ import org.apache.qpid.jms.Session; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.transport.TransportException; +import org.apache.qpid.util.Strings; public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMessageProducer_0_8> { @@ -175,12 +177,49 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe } public void sendQueueBind(final AMQShortString queueName, final AMQShortString routingKey, final FieldTable arguments, - final AMQShortString exchangeName, final AMQDestination dest, + final AMQShortString exchangeName, final AMQDestination destination, final boolean nowait) throws AMQException, FailoverException { - getProtocolHandler().syncWrite(getProtocolHandler().getMethodRegistry().createQueueBindBody - (getTicket(),queueName,exchangeName,routingKey,false,arguments). - generateFrame(getChannelId()), QueueBindOkBody.class); + if (destination == null || destination.getDestSyntax() == AMQDestination.DestSyntax.BURL) + { + + getProtocolHandler().syncWrite(getProtocolHandler().getMethodRegistry().createQueueBindBody + (getTicket(), queueName, exchangeName, routingKey, false, arguments). + generateFrame(getChannelId()), QueueBindOkBody.class); + + } + else + { + // Leaving this here to ensure the public method bindQueue in AMQSession.java works as expected. + List<AMQDestination.Binding> bindings = new ArrayList<AMQDestination.Binding>(); + bindings.addAll(destination.getNode().getBindings()); + + String defaultExchange = destination.getAddressType() == AMQDestination.TOPIC_TYPE ? + destination.getAddressName(): "amq.topic"; + + for (AMQDestination.Binding binding: bindings) + { + // Currently there is a bug (QPID-3317) with setting up and tearing down x-bindings for link. + // The null check below is a way to side step that issue while fixing QPID-4146 + // Note this issue only affects producers. + if (binding.getQueue() == null && queueName == null) + { + continue; + } + String queue = binding.getQueue() == null? + queueName.asString(): binding.getQueue(); + + String exchange = binding.getExchange() == null ? + defaultExchange : + binding.getExchange(); + + _logger.debug("Binding queue : " + queue + + " exchange: " + exchange + + " using binding key " + binding.getBindingKey() + + " with args " + Strings.printMap(binding.getArgs())); + doBind(destination, binding, queue, exchange); + } + } } public void sendClose(long timeout) throws AMQException, FailoverException @@ -547,10 +586,8 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe Map<String,Object> bindingArguments = new HashMap<String, Object>(); bindingArguments.put(AMQPFilterTypes.JMS_SELECTOR.getValue().toString(), messageSelector == null ? "" : messageSelector); - bindQueue(AMQShortString.valueOf(queueName), - AMQShortString.valueOf(dest.getSubject()), - FieldTable.convertToFieldTable(bindingArguments), - AMQShortString.valueOf(dest.getAddressName()),dest,false); + final AMQDestination.Binding binding = new AMQDestination.Binding(dest.getAddressName(), queueName, dest.getSubject(), bindingArguments); + doBind(dest, binding, queueName, dest.getAddressName()); } @@ -589,6 +626,15 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe getProtocolHandler().syncWrite(exchangeDeclare, ExchangeDeclareOkBody.class); } + public void sendExchangeDelete(final String name) throws AMQException, FailoverException + { + ExchangeDeleteBody body = + getMethodRegistry().createExchangeDeleteBody(getTicket(),AMQShortString.valueOf(name),false, false); + AMQFrame exchangeDeclare = body.generateFrame(getChannelId()); + + getProtocolHandler().syncWrite(exchangeDeclare, ExchangeDeclareOkBody.class); + } + private void sendQueueDeclare(final AMQDestination amqd, boolean passive) throws AMQException, FailoverException { AMQShortString queueName = amqd.getAMQQueueName(); @@ -821,18 +867,25 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe protected Long requestQueueDepth(AMQDestination amqd, boolean sync) throws AMQException, FailoverException { - AMQFrame queueDeclare = - getMethodRegistry().createQueueDeclareBody(getTicket(), - amqd.getAMQQueueName(), - true, - amqd.isDurable(), - amqd.isExclusive(), - amqd.isAutoDelete(), - false, - null).generateFrame(getChannelId()); - QueueDeclareOkHandler okHandler = new QueueDeclareOkHandler(); - getProtocolHandler().writeCommandFrameAndWaitForReply(queueDeclare, okHandler); - return okHandler.getMessageCount(); + if(isBound(null, amqd.getAMQQueueName(), null)) + { + AMQFrame queueDeclare = + getMethodRegistry().createQueueDeclareBody(getTicket(), + amqd.getAMQQueueName(), + true, + amqd.isDurable(), + amqd.isExclusive(), + amqd.isAutoDelete(), + false, + null).generateFrame(getChannelId()); + QueueDeclareOkHandler okHandler = new QueueDeclareOkHandler(); + getProtocolHandler().writeCommandFrameAndWaitForReply(queueDeclare, okHandler); + return okHandler.getMessageCount(); + } + else + { + return 0l; + } } protected boolean tagLE(long tag1, long tag2) @@ -916,6 +969,11 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe { arguments.put(AddressHelper.NO_LOCAL, noLocal); } + String altExchange = node.getAlternateExchange(); + if(altExchange != null && !"".equals(altExchange)) + { + arguments.put("alternateExchange", altExchange); + } (new FailoverNoopSupport<Void, AMQException>( new FailoverProtectedOperation<Void, AMQException>() @@ -942,13 +1000,21 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe void handleExchangeNodeCreation(AMQDestination dest) throws AMQException { Node node = dest.getNode(); + String altExchange = dest.getNode().getAlternateExchange(); + Map<String, Object> arguments = node.getDeclareArgs(); + + if(altExchange != null && !"".equals(altExchange)) + { + arguments.put("alternateExchange", altExchange); + } + // can't set alt. exchange declareExchange(AMQShortString.valueOf(dest.getAddressName()), AMQShortString.valueOf(node.getExchangeType()), false, node.isDurable(), node.isAutoDelete(), - FieldTable.convertToFieldTable(node.getDeclareArgs()), false); + FieldTable.convertToFieldTable(arguments), false); // If bindings are specified without a queue name and is called by the producer, // the broker will send an exception as expected. @@ -962,9 +1028,79 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe final String queue, final String exchange) throws AMQException { - bindQueue(AMQShortString.valueOf(queue),AMQShortString.valueOf(binding.getBindingKey()), - FieldTable.convertToFieldTable(binding.getArgs()), - AMQShortString.valueOf(exchange),dest); + final String bindingKey = binding.getBindingKey() == null ? queue : binding.getBindingKey(); + + new FailoverNoopSupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>() + { + public Object execute() throws AMQException, FailoverException + { + + + MethodRegistry methodRegistry = getProtocolHandler().getMethodRegistry(); + QueueBindBody queueBindBody = + methodRegistry.createQueueBindBody(getTicket(), + AMQShortString.valueOf(queue), + AMQShortString.valueOf(exchange), + AMQShortString.valueOf(bindingKey), + false, + FieldTable.convertToFieldTable(binding.getArgs())); + + getProtocolHandler().syncWrite(queueBindBody. + generateFrame(getChannelId()), QueueBindOkBody.class); + return null; + } + }, getAMQConnection()).execute(); + + } + + + protected void doUnbind(final AMQDestination.Binding binding, + final String queue, + final String exchange) throws AMQException + { + new FailoverNoopSupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>() + { + public Object execute() throws AMQException, FailoverException + { + + if (isBound(null, AMQShortString.valueOf(queue), null)) + { + MethodRegistry methodRegistry = getProtocolHandler().getMethodRegistry(); + AMQMethodBody body; + if (methodRegistry instanceof MethodRegistry_0_9) + { + String bindingKey = binding.getBindingKey() == null ? queue : binding.getBindingKey(); + + MethodRegistry_0_9 methodRegistry_0_9 = (MethodRegistry_0_9) methodRegistry; + body = methodRegistry_0_9.createQueueUnbindBody(getTicket(), + AMQShortString.valueOf(queue), + AMQShortString.valueOf(exchange), + AMQShortString.valueOf(bindingKey), + null); + } + else if (methodRegistry instanceof MethodRegistry_0_91) + { + MethodRegistry_0_91 methodRegistry_0_91 = (MethodRegistry_0_91) methodRegistry; + body = methodRegistry_0_91.createQueueUnbindBody(getTicket(), + AMQShortString.valueOf(queue), + AMQShortString.valueOf(exchange), + AMQShortString.valueOf(binding.getBindingKey()), + null); + + } + else + { + throw new AMQException(AMQConstant.NOT_IMPLEMENTED, "Cannot unbind a queue in AMQP 0-8"); + } + getProtocolHandler().syncWrite(body.generateFrame(getChannelId()), QueueUnbindOkBody.class); + return null; + } + else + { + return null; + } + } + }, getAMQConnection()).execute(); } public boolean isQueueExist(AMQDestination dest, boolean assertNode) throws AMQException @@ -1057,6 +1193,102 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe return match; } + @Override + void handleNodeDelete(final AMQDestination dest) throws AMQException + { + if (AMQDestination.TOPIC_TYPE == dest.getAddressType()) + { + if (isExchangeExist(dest,false)) + { + + new FailoverNoopSupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>() + { + public Object execute() throws AMQException, FailoverException + { + sendExchangeDelete(dest.getAddressName()); + return null; + } + }, getAMQConnection()).execute(); + dest.setAddressResolved(0); + } + } + else + { + if (isQueueExist(dest,false)) + { + + new FailoverNoopSupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>() + { + public Object execute() throws AMQException, FailoverException + { + sendQueueDelete(AMQShortString.valueOf(dest.getAddressName())); + return null; + } + }, getAMQConnection()).execute(); + dest.setAddressResolved(0); + } + } + } + + @Override + void handleLinkDelete(AMQDestination dest) throws AMQException + { + // We need to destroy link bindings + String defaultExchangeForBinding = dest.getAddressType() == AMQDestination.TOPIC_TYPE ? dest + .getAddressName() : "amq.topic"; + + String defaultQueueName = null; + if (AMQDestination.QUEUE_TYPE == dest.getAddressType()) + { + defaultQueueName = dest.getQueueName(); + } + else + { + defaultQueueName = dest.getLink().getName() != null ? dest.getLink().getName() : dest.getQueueName(); + } + + for (AMQDestination.Binding binding: dest.getLink().getBindings()) + { + String queue = binding.getQueue() == null? + defaultQueueName: binding.getQueue(); + + String exchange = binding.getExchange() == null ? + defaultExchangeForBinding : + binding.getExchange(); + + if (_logger.isDebugEnabled()) + { + _logger.debug("Unbinding queue : " + queue + + " exchange: " + exchange + + " using binding key " + binding.getBindingKey() + + " with args " + Strings.printMap(binding.getArgs())); + } + doUnbind(binding, queue, exchange); + } + } + + + void deleteSubscriptionQueue(final AMQDestination dest) throws AMQException + { + // We need to delete the subscription queue. + if (dest.getAddressType() == AMQDestination.TOPIC_TYPE && + dest.getLink().getSubscriptionQueue().isExclusive() && + isQueueExist(dest.getQueueName(), false, false, false, false, null)) + { + (new FailoverNoopSupport<Void, AMQException>( + new FailoverProtectedOperation<Void, AMQException>() + { + public Void execute() throws AMQException, FailoverException + { + + sendQueueDelete(AMQShortString.valueOf(dest.getQueueName())); + return null; + } + }, getAMQConnection())).execute(); + + } + } + protected void flushAcknowledgments() { diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index 01e89b78c1..187be8522c 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -20,19 +20,35 @@ */ package org.apache.qpid.client; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import javax.jms.InvalidSelectorException; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageListener; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.qpid.AMQException; import org.apache.qpid.AMQInternalException; import org.apache.qpid.client.failover.FailoverException; +import org.apache.qpid.client.filter.JMSSelectorFilter; import org.apache.qpid.client.filter.MessageFilter; import org.apache.qpid.client.message.AMQMessageDelegateFactory; import org.apache.qpid.client.message.AbstractJMSMessage; import org.apache.qpid.client.message.CloseConsumerMessage; import org.apache.qpid.client.message.MessageFactoryRegistry; import org.apache.qpid.common.AMQPFilterTypes; -import org.apache.qpid.client.filter.JMSSelectorFilter; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.FieldTableFactory; @@ -40,21 +56,6 @@ import org.apache.qpid.jms.MessageConsumer; import org.apache.qpid.jms.Session; import org.apache.qpid.transport.TransportException; -import javax.jms.InvalidSelectorException; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageListener; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Iterator; -import java.util.List; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; - public abstract class BasicMessageConsumer<U> extends Closeable implements MessageConsumer { private static final Logger _logger = LoggerFactory.getLogger(BasicMessageConsumer.class); @@ -376,7 +377,23 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa */ public boolean isExclusive() { - return _exclusive; + + AMQDestination dest = this.getDestination(); + if (dest.getDestSyntax() == AMQDestination.DestSyntax.ADDR) + { + if (dest.getAddressType() == AMQDestination.TOPIC_TYPE) + { + return true; + } + else + { + return dest.getLink().getSubscription().isExclusive(); + } + } + else + { + return _exclusive; + } } public boolean isReceiving() diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java index 658fb25ce4..8f91a7db08 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java @@ -17,12 +17,18 @@ */ package org.apache.qpid.client; +import java.util.Iterator; +import java.util.concurrent.atomic.AtomicBoolean; + +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageListener; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.qpid.AMQException; import org.apache.qpid.client.AMQDestination.AddressOption; -import org.apache.qpid.client.failover.FailoverException; import org.apache.qpid.client.message.AMQMessageDelegateFactory; import org.apache.qpid.client.message.AMQMessageDelegate_0_10; import org.apache.qpid.client.message.AbstractJMSMessage; @@ -41,12 +47,6 @@ import org.apache.qpid.transport.RangeSetFactory; import org.apache.qpid.transport.SessionException; import org.apache.qpid.transport.TransportException; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageListener; -import java.util.Iterator; -import java.util.concurrent.atomic.AtomicBoolean; - /** * This is a 0.10 message consumer. */ @@ -480,26 +480,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM clearReceiveQueue(); } } - - public boolean isExclusive() - { - AMQDestination dest = this.getDestination(); - if (dest.getDestSyntax() == AMQDestination.DestSyntax.ADDR) - { - if (dest.getAddressType() == AMQDestination.TOPIC_TYPE) - { - return true; - } - else - { - return dest.getLink().getSubscription().isExclusive(); - } - } - else - { - return super.isExclusive(); - } - } + void postSubscription() throws AMQException { @@ -509,10 +490,10 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM if (dest.getDelete() == AddressOption.ALWAYS || dest.getDelete() == AddressOption.RECEIVER ) { - ((AMQSession_0_10) getSession()).handleNodeDelete(dest); + getSession().handleNodeDelete(dest); } // Subscription queue is handled as part of linkDelete method. - ((AMQSession_0_10) getSession()).handleLinkDelete(dest); + getSession().handleLinkDelete(dest); if (!isDurableSubscriber()) { ((AMQSession_0_10) getSession()).deleteSubscriptionQueue(dest); @@ -566,4 +547,4 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM return capacity; } -}
\ No newline at end of file +} diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java index 23d65e15d8..cdffc73932 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java @@ -118,13 +118,33 @@ public class BasicMessageConsumer_0_8 extends BasicMessageConsumer<UnprocessedMe final AMQFrame cancelFrame = body.generateFrame(getChannelId()); getConnection().getProtocolHandler().syncWrite(cancelFrame, BasicCancelOkBody.class); - + postSubscription(); + getSession().sync(); if (_logger.isDebugEnabled()) { _logger.debug("CancelOk'd for consumer:" + debugIdentity()); } } + void postSubscription() throws AMQException + { + AMQDestination dest = this.getDestination(); + if (dest != null && dest.getDestSyntax() == AMQDestination.DestSyntax.ADDR) + { + if (dest.getDelete() == AMQDestination.AddressOption.ALWAYS || + dest.getDelete() == AMQDestination.AddressOption.RECEIVER ) + { + getSession().handleNodeDelete(dest); + } + // Subscription queue is handled as part of linkDelete method. + getSession().handleLinkDelete(dest); + if (!isDurableSubscriber()) + { + getSession().deleteSubscriptionQueue(dest); + } + } + } + public AbstractJMSMessage createJMSMessageFromUnprocessedMessage(AMQMessageDelegateFactory delegateFactory, UnprocessedMessage_0_8 messageFrame)throws Exception { diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java index 33bafe8f20..1d47ce9a07 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java @@ -21,6 +21,7 @@ package org.apache.qpid.client; import java.util.UUID; + import javax.jms.BytesMessage; import javax.jms.DeliveryMode; import javax.jms.Destination; @@ -32,13 +33,15 @@ import javax.jms.ObjectMessage; import javax.jms.StreamMessage; import javax.jms.TextMessage; import javax.jms.Topic; + +import org.slf4j.Logger; + import org.apache.qpid.AMQException; import org.apache.qpid.client.message.AbstractJMSMessage; import org.apache.qpid.client.message.MessageConverter; import org.apache.qpid.transport.TransportException; import org.apache.qpid.util.UUIDGen; import org.apache.qpid.util.UUIDs; -import org.slf4j.Logger; public abstract class BasicMessageProducer extends Closeable implements org.apache.qpid.jms.MessageProducer { @@ -286,6 +289,31 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac { setClosed(); _session.deregisterProducer(_producerId); + AMQDestination dest = getAMQDestination(); + AMQSession ssn = getSession(); + if (!ssn.isClosed() && dest != null && dest.getDestSyntax() == AMQDestination.DestSyntax.ADDR) + { + try + { + if (dest.getDelete() == AMQDestination.AddressOption.ALWAYS || + dest.getDelete() == AMQDestination.AddressOption.SENDER ) + { + ssn.handleNodeDelete(dest); + } + ssn.handleLinkDelete(dest); + } + catch(TransportException e) + { + throw getSession().toJMSException("Exception while closing producer:" + e.getMessage(), e); + } + catch (AMQException e) + { + JMSException ex = new JMSException("Exception while closing producer:" + e.getMessage()); + ex.setLinkedException(e); + ex.initCause(e); + throw ex; + } + } } public void send(Message message) throws JMSException diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java index eb8104b02c..06a3b08272 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java @@ -34,7 +34,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.qpid.AMQException; -import org.apache.qpid.client.AMQDestination.AddressOption; import org.apache.qpid.client.AMQDestination.DestSyntax; import org.apache.qpid.client.message.AMQMessageDelegate_0_10; import org.apache.qpid.client.message.AbstractJMSMessage; @@ -48,7 +47,6 @@ import org.apache.qpid.transport.MessageDeliveryMode; import org.apache.qpid.transport.MessageDeliveryPriority; import org.apache.qpid.transport.MessageProperties; import org.apache.qpid.transport.Option; -import org.apache.qpid.transport.TransportException; import org.apache.qpid.util.GZIPUtils; import org.apache.qpid.util.Strings; @@ -90,8 +88,8 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer try { getSession().resolveAddress(destination,false,false); - ((AMQSession_0_10)getSession()).handleLinkCreation(destination); - ((AMQSession_0_10)getSession()).sync(); + getSession().handleLinkCreation(destination); + getSession().sync(); } catch(Exception e) { @@ -278,31 +276,6 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer public void close() throws JMSException { super.close(); - AMQDestination dest = getAMQDestination(); - AMQSession_0_10 ssn = (AMQSession_0_10) getSession(); - if (!ssn.isClosed() && dest != null && dest.getDestSyntax() == AMQDestination.DestSyntax.ADDR) - { - try - { - if (dest.getDelete() == AddressOption.ALWAYS || - dest.getDelete() == AddressOption.SENDER ) - { - ssn.handleNodeDelete(dest); - } - ssn.handleLinkDelete(dest); - } - catch(TransportException e) - { - throw getSession().toJMSException("Exception while closing producer:" + e.getMessage(), e); - } - catch (AMQException e) - { - JMSException ex = new JMSException("Exception while closing producer:" + e.getMessage()); - ex.setLinkedException(e); - ex.initCause(e); - throw ex; - } - } } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java index 89bf146398..e1b399e10a 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java @@ -34,15 +34,18 @@ import org.slf4j.LoggerFactory; import org.apache.qpid.AMQException; import org.apache.qpid.client.message.AMQMessageDelegate_0_8; import org.apache.qpid.client.message.AbstractJMSMessage; +import org.apache.qpid.client.message.QpidMessageProperties; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.configuration.ClientProperties; import org.apache.qpid.framing.AMQFrame; +import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.framing.BasicPublishBody; import org.apache.qpid.framing.CompositeAMQDataBlock; import org.apache.qpid.framing.ContentBody; import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.ExchangeDeclareBody; +import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.MethodRegistry; import org.apache.qpid.util.GZIPUtils; @@ -63,6 +66,9 @@ public class BasicMessageProducer_0_8 extends BasicMessageProducer if (destination.getDestSyntax() == AMQDestination.DestSyntax.ADDR) { getSession().resolveAddress(destination, false, false); + + getSession().handleLinkCreation(destination); + getSession().sync(); } else { @@ -92,18 +98,43 @@ public class BasicMessageProducer_0_8 extends BasicMessageProducer UUID messageId, int deliveryMode,int priority, long timeToLive, boolean mandatory, boolean immediate) throws JMSException { + + + + AMQMessageDelegate_0_8 delegate = (AMQMessageDelegate_0_8) message.getDelegate(); + BasicContentHeaderProperties contentHeaderProperties = delegate.getContentHeaderProperties(); + + AMQShortString routingKey = destination.getRoutingKey(); + + FieldTable headers = delegate.getContentHeaderProperties().getHeaders(); + + if (destination.getDestSyntax() == AMQDestination.DestSyntax.ADDR && + (destination.getSubject() != null + || (headers != null && headers.get(QpidMessageProperties.QPID_SUBJECT) != null))) + { + + if (headers.get(QpidMessageProperties.QPID_SUBJECT) == null) + { + // use default subject in address string + headers.setString(QpidMessageProperties.QPID_SUBJECT, destination.getSubject()); + } + + if (destination.getAddressType() == AMQDestination.TOPIC_TYPE) + { + routingKey = AMQShortString.valueOf(headers.getString(QpidMessageProperties.QPID_SUBJECT)); + } + } + BasicPublishBody body = getSession().getMethodRegistry().createBasicPublishBody(getSession().getTicket(), - destination.getExchangeName(), - destination.getRoutingKey(), - mandatory, - immediate); + destination.getExchangeName(), + routingKey, + mandatory, + immediate); AMQFrame publishFrame = body.generateFrame(getChannelId()); message.prepareForSending(); ByteBuffer payload = message.getData(); - AMQMessageDelegate_0_8 delegate = (AMQMessageDelegate_0_8) message.getDelegate(); - BasicContentHeaderProperties contentHeaderProperties = delegate.getContentHeaderProperties(); contentHeaderProperties.setUserId(getUserID()); diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/util/BlockingWaiter.java b/qpid/java/client/src/main/java/org/apache/qpid/client/util/BlockingWaiter.java index 21f1623dd1..747668ff9c 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/util/BlockingWaiter.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/util/BlockingWaiter.java @@ -159,7 +159,7 @@ public abstract class BlockingWaiter<T> { _waiting.set(true); - while (!_ready) + while (!_ready && _error == null) { try { diff --git a/qpid/java/systests/etc/config-systests.json b/qpid/java/systests/etc/config-systests.json index 4ec402c292..3ef4fa40f8 100644 --- a/qpid/java/systests/etc/config-systests.json +++ b/qpid/java/systests/etc/config-systests.json @@ -21,7 +21,6 @@ { "name": "Broker", "defaultVirtualHost" : "test", - "storeVersion": 1, "modelVersion": "2.0", "authenticationproviders" : [ { "name" : "plain", diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java index 14cadc2389..391498194b 100644 --- a/qpid/java/systests/src/test/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java +++ b/qpid/java/systests/src/test/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java @@ -52,7 +52,6 @@ import org.apache.qpid.client.AMQAnyDestination; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQDestination; import org.apache.qpid.client.AMQSession; -import org.apache.qpid.client.AMQSession_0_10; import org.apache.qpid.client.message.QpidMessageProperties; import org.apache.qpid.jndi.PropertiesFileInitialContextFactory; import org.apache.qpid.messaging.Address; @@ -76,7 +75,14 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase @Override public void tearDown() throws Exception { - _connection.close(); + try + { + _connection.close(); + } + catch(JMSException e) + { + // ignore + } super.tearDown(); } @@ -90,14 +96,15 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase // create never -------------------------------------------- String addr1 = "ADDR:testQueue1"; AMQDestination dest = new AMQAnyDestination(addr1); + final String queueErrorMessage = "The name 'testQueue1' supplied in the address " + + "doesn't resolve to an exchange or a queue"; try { cons = jmsSession.createConsumer(dest); } catch(JMSException e) { - assertTrue(e.getMessage().contains("The name 'testQueue1' supplied in the address " + - "doesn't resolve to an exchange or a queue")); + assertTrue(e.getMessage().contains(queueErrorMessage)); } try @@ -106,12 +113,12 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase } catch(JMSException e) { - assertTrue(e.getCause().getCause().getMessage().contains("The name 'testQueue1' supplied in the address " + - "doesn't resolve to an exchange or a queue")); + assertTrue(e.getCause().getMessage().contains(queueErrorMessage) + || e.getCause().getCause().getMessage().contains(queueErrorMessage)); } assertFalse("Queue should not be created",( - (AMQSession_0_10)jmsSession).isQueueExist(dest,false)); + (AMQSession)jmsSession).isQueueExist(dest,false)); // create always ------------------------------------------- @@ -120,9 +127,9 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase cons = jmsSession.createConsumer(dest); assertTrue("Queue not created as expected",( - (AMQSession_0_10)jmsSession).isQueueExist(dest, true)); + (AMQSession)jmsSession).isQueueExist(dest, true)); assertTrue("Queue not bound as expected",( - (AMQSession_0_10)jmsSession).isQueueBound("", + (AMQSession)jmsSession).isQueueBound("", dest.getAddressName(),dest.getAddressName(), null)); // create receiver ----------------------------------------- @@ -134,33 +141,36 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase } catch(JMSException e) { - assertTrue(e.getCause().getCause().getMessage().contains("The name 'testQueue2' supplied in the address " + - "doesn't resolve to an exchange or a queue")); + String expectedMessage = "The name 'testQueue2' supplied in the address " + + "doesn't resolve to an exchange or a queue"; + assertTrue(e.getCause().getMessage().contains(expectedMessage) + || e.getCause().getCause().getMessage().contains(expectedMessage)); } assertFalse("Queue should not be created",( - (AMQSession_0_10)jmsSession).isQueueExist(dest, false)); + (AMQSession)jmsSession).isQueueExist(dest, false)); cons = jmsSession.createConsumer(dest); assertTrue("Queue not created as expected",( - (AMQSession_0_10)jmsSession).isQueueExist(dest, true)); + (AMQSession)jmsSession).isQueueExist(dest, true)); assertTrue("Queue not bound as expected",( - (AMQSession_0_10)jmsSession).isQueueBound("", + (AMQSession)jmsSession).isQueueBound("", dest.getAddressName(),dest.getAddressName(), null)); // create never -------------------------------------------- addr1 = "ADDR:testQueue3; { create: never }"; dest = new AMQAnyDestination(addr1); + String testQueue3ErrorMessage = "The name 'testQueue3' supplied in the address " + + "doesn't resolve to an exchange or a queue"; try { cons = jmsSession.createConsumer(dest); } catch(JMSException e) { - assertTrue(e.getMessage().contains("The name 'testQueue3' supplied in the address " + - "doesn't resolve to an exchange or a queue")); + assertTrue(e.getMessage().contains(testQueue3ErrorMessage)); } try @@ -169,12 +179,12 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase } catch(JMSException e) { - assertTrue(e.getCause().getCause().getMessage().contains("The name 'testQueue3' supplied in the address " + - "doesn't resolve to an exchange or a queue")); + assertTrue(e.getCause().getMessage().contains(testQueue3ErrorMessage) + || e.getCause().getCause().getMessage().contains(testQueue3ErrorMessage)); } assertFalse("Queue should not be created",( - (AMQSession_0_10)jmsSession).isQueueExist(dest, false)); + (AMQSession)jmsSession).isQueueExist(dest, false)); // create sender ------------------------------------------ addr1 = "ADDR:testQueue3; { create: sender }"; @@ -186,17 +196,16 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase } catch(JMSException e) { - assertTrue(e.getMessage().contains("The name 'testQueue3' supplied in the address " + - "doesn't resolve to an exchange or a queue")); + assertTrue(e.getMessage().contains(testQueue3ErrorMessage)); } assertFalse("Queue should not be created",( - (AMQSession_0_10)jmsSession).isQueueExist(dest, false)); + (AMQSession)jmsSession).isQueueExist(dest, false)); prod = jmsSession.createProducer(dest); assertTrue("Queue not created as expected",( - (AMQSession_0_10)jmsSession).isQueueExist(dest, true)); + (AMQSession)jmsSession).isQueueExist(dest, true)); assertTrue("Queue not bound as expected",( - (AMQSession_0_10)jmsSession).isQueueBound("", + (AMQSession)jmsSession).isQueueBound("", dest.getAddressName(),dest.getAddressName(), null)); } @@ -234,22 +243,22 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase // Even if the consumer is closed the queue and the bindings should be intact. assertTrue("Queue not created as expected",( - (AMQSession_0_10)jmsSession).isQueueExist(dest, true)); + (AMQSession)jmsSession).isQueueExist(dest, true)); assertTrue("Queue not bound as expected",( - (AMQSession_0_10)jmsSession).isQueueBound("", + (AMQSession)jmsSession).isQueueBound("", dest.getAddressName(),dest.getAddressName(), null)); assertTrue("Queue not bound as expected",( - (AMQSession_0_10)jmsSession).isQueueBound("amq.direct", + (AMQSession)jmsSession).isQueueBound("amq.direct", dest.getAddressName(),"test", null)); assertTrue("Queue not bound as expected",( - (AMQSession_0_10)jmsSession).isQueueBound("amq.fanout", + (AMQSession)jmsSession).isQueueBound("amq.fanout", dest.getAddressName(),null, null)); assertTrue("Queue not bound as expected",( - (AMQSession_0_10)jmsSession).isQueueBound("amq.topic", + (AMQSession)jmsSession).isQueueBound("amq.topic", dest.getAddressName(),"a.#", null)); Map<String,Object> args = new HashMap<String,Object>(); @@ -257,7 +266,7 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase args.put("dep","sales"); args.put("loc","CA"); assertTrue("Queue not bound as expected",( - (AMQSession_0_10)jmsSession).isQueueBound("amq.match", + (AMQSession)jmsSession).isQueueBound("amq.match", dest.getAddressName(),null, args)); MessageProducer prod = jmsSession.createProducer(dest); @@ -339,6 +348,11 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase { return; } + else if((useNonsenseExchangeType || useNonsenseArguments) && !isBroker010() + && String.valueOf(AMQConstant.COMMAND_INVALID.getCode()).equals(e.getErrorCode())) + { + return; + } else { fail("Unexpected exception whilst creating consumer: " + e); @@ -346,11 +360,11 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase } assertTrue("Exchange not created as expected",( - (AMQSession_0_10)jmsSession).isExchangeExist(dest,true)); + (AMQSession)jmsSession).isExchangeExist(dest,true)); // The existence of the queue is implicitly tested here assertTrue("Queue not bound as expected",( - (AMQSession_0_10)jmsSession).isQueueBound("my-exchange", + (AMQSession)jmsSession).isQueueBound("my-exchange", dest.getQueueName(),"hello", null)); // The client should be able to query and verify the existence of my-exchange (QPID-2774) @@ -387,23 +401,23 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase public void checkQueueForBindings(Session jmsSession, AMQDestination dest,String headersBinding) throws Exception { assertTrue("Queue not created as expected",( - (AMQSession_0_10)jmsSession).isQueueExist(dest, true)); + (AMQSession)jmsSession).isQueueExist(dest, true)); - assertTrue("Queue not bound as expected",( - (AMQSession_0_10)jmsSession).isQueueBound("", - dest.getAddressName(),dest.getAddressName(), null)); + assertTrue("Queue not bound as expected", ( + (AMQSession) jmsSession).isQueueBound("", + dest.getAddressName(), dest.getAddressName(), null)); assertTrue("Queue not bound as expected",( - (AMQSession_0_10)jmsSession).isQueueBound("amq.direct", + (AMQSession)jmsSession).isQueueBound("amq.direct", dest.getAddressName(),"test", null)); assertTrue("Queue not bound as expected",( - (AMQSession_0_10)jmsSession).isQueueBound("amq.topic", + (AMQSession)jmsSession).isQueueBound("amq.topic", dest.getAddressName(),"a.#", null)); Address a = Address.parse(headersBinding); assertTrue("Queue not bound as expected",( - (AMQSession_0_10)jmsSession).isQueueBound("amq.match", + (AMQSession)jmsSession).isQueueBound("amq.match", dest.getAddressName(),null, a.getOptions())); } @@ -526,17 +540,17 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase MessageConsumer cons3 = jmsSession.createConsumer(dest3); assertTrue("Destination1 was not created as expected",( - (AMQSession_0_10)jmsSession).isQueueExist(dest1, true)); + (AMQSession)jmsSession).isQueueExist(dest1, true)); assertTrue("Destination1 was not bound as expected",( - (AMQSession_0_10)jmsSession).isQueueBound("", + (AMQSession)jmsSession).isQueueBound("", dest1.getAddressName(),dest1.getAddressName(), null)); assertTrue("Destination2 was not created as expected",( - (AMQSession_0_10)jmsSession).isQueueExist(dest2,true)); + (AMQSession)jmsSession).isQueueExist(dest2,true)); assertTrue("Destination2 was not bound as expected",( - (AMQSession_0_10)jmsSession).isQueueBound("", + (AMQSession)jmsSession).isQueueBound("", dest2.getAddressName(),dest2.getAddressName(), null)); MessageProducer producer = jmsSession.createProducer(dest3); @@ -587,7 +601,7 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase MessageProducer prod = ssn.createProducer(queue); MessageConsumer cons = ssn.createConsumer(queue); assertTrue("my-queue was not created as expected",( - (AMQSession_0_10)ssn).isQueueBound("amq.direct", + (AMQSession)ssn).isQueueBound("amq.direct", "my-queue","my-queue", null)); prod.send(ssn.createTextMessage("test")); @@ -606,7 +620,7 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase { String s = "The name 'my-queue2' supplied in the address " + "doesn't resolve to an exchange or a queue"; - assertEquals(s,e.getCause().getCause().getMessage()); + assertTrue(s.equals(e.getCause().getMessage()) || s.equals(e.getCause().getCause().getMessage())); } // explicit create case @@ -614,7 +628,7 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase prod = ssn.createProducer(queue); cons = ssn.createConsumer(queue); assertTrue("my-queue2 was not created as expected",( - (AMQSession_0_10)ssn).isQueueBound("", + (AMQSession)ssn).isQueueBound("", "my-queue2","my-queue2", null)); prod.send(ssn.createTextMessage("test")); @@ -631,7 +645,7 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase cons = ssn.createConsumer(queue); prod = ssn.createProducer(queue); assertTrue("MY.RESP.QUEUE was not created as expected",( - (AMQSession_0_10)ssn).isQueueBound("amq.direct", + (AMQSession)ssn).isQueueBound("amq.direct", "MY.RESP.QUEUE","x512", null)); cons.close(); } @@ -701,15 +715,15 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase prod = ssn.createProducer(topic); assertTrue("The queue was not bound to vehicle exchange using bus as the binding key",( - (AMQSession_0_10)ssn).isQueueBound("vehicles", + (AMQSession)ssn).isQueueBound("vehicles", "my-topic","bus", null)); assertTrue("The queue was not bound to vehicle exchange using car as the binding key",( - (AMQSession_0_10)ssn).isQueueBound("vehicles", + (AMQSession)ssn).isQueueBound("vehicles", "my-topic","car", null)); assertTrue("The queue was not bound to vehicle exchange using van as the binding key",( - (AMQSession_0_10)ssn).isQueueBound("vehicles", + (AMQSession)ssn).isQueueBound("vehicles", "my-topic","van", null)); Message msg = ssn.createTextMessage("test"); @@ -822,15 +836,18 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase catch(Exception e) { } - _connection.close(); + } + + public void testJMSTopicIsTreatedAsQueueIn0_10() throws Exception + { _connection = getConnection() ; _connection.start(); - ssn = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE); - dest = ssn.createTopic("ADDR:my_queue; {create: always}"); - consumer1 = ssn.createConsumer(dest); - consumer2 = ssn.createConsumer(dest); - prod = ssn.createProducer(dest); + final Session ssn = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + final Destination dest = ssn.createTopic("ADDR:my_queue; {create: always}"); + final MessageConsumer consumer1 = ssn.createConsumer(dest); + final MessageConsumer consumer2 = ssn.createConsumer(dest); + final MessageProducer prod = ssn.createProducer(dest); prod.send(ssn.createTextMessage("A")); Message m1 = consumer1.receive(1000); @@ -864,15 +881,15 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase MessageConsumer cons = ssn.createConsumer(topic); assertTrue("The queue was not bound to MRKT exchange using NYSE.# as the binding key",( - (AMQSession_0_10)ssn).isQueueBound("MRKT", + (AMQSession)ssn).isQueueBound("MRKT", "my-topic","NYSE.#", null)); assertTrue("The queue was not bound to MRKT exchange using NASDAQ.# as the binding key",( - (AMQSession_0_10)ssn).isQueueBound("MRKT", + (AMQSession)ssn).isQueueBound("MRKT", "my-topic","NASDAQ.#", null)); assertTrue("The queue was not bound to MRKT exchange using CNTL.# as the binding key",( - (AMQSession_0_10)ssn).isQueueBound("MRKT", + (AMQSession)ssn).isQueueBound("MRKT", "my-topic","CNTL.#", null)); MessageProducer prod = ssn.createProducer(topic); @@ -886,7 +903,7 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase public void testXSubscribeOverrides() throws Exception { Session ssn = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE); - String str = "ADDR:my_queue; {create:always,link: {x-subscribes:{exclusive: true, arguments: {a:b,x:y}}}}"; + String str = "ADDR:my_queue; {create:always, node: { type: queue }, link: {x-subscribes:{exclusive: true, arguments: {a:b,x:y}}}}"; Destination dest = ssn.createTopic(str); MessageConsumer consumer1 = ssn.createConsumer(dest); try @@ -937,7 +954,7 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase props.setProperty("destination.address1", "ADDR:amq.topic/test"); props.setProperty("destination.address2", "ADDR:amq.topic/test; {node:{" + bindingStr); props.setProperty("destination.address3", "ADDR:amq.topic/test; {link:{" + bindingStr); - String addrStr = "ADDR:my_queue; {create:always,link: {x-subscribes:{exclusive: true, arguments: {a:b,x:y}}}}"; + String addrStr = "ADDR:my_queue; {create:always,node : {type: queue}, link: {x-subscribes:{exclusive: true, arguments: {a:b,x:y}}}}"; props.setProperty("destination.address5", addrStr); Context ctx = new InitialContext(props); @@ -1055,7 +1072,7 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase } assertFalse("Queue not deleted as expected",( - (AMQSession_0_10)jmsSession).isQueueExist(dest, false)); + (AMQSession)jmsSession).isQueueExist(dest, false)); String addr2 = "ADDR:testQueue2;{create: always, delete: receiver}"; @@ -1071,7 +1088,7 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase } assertFalse("Queue not deleted as expected",( - (AMQSession_0_10)jmsSession).isQueueExist(dest, false)); + (AMQSession)jmsSession).isQueueExist(dest, false)); String addr3 = "ADDR:testQueue3;{create: always, delete: sender}"; @@ -1088,7 +1105,7 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase } assertFalse("Queue not deleted as expected",( - (AMQSession_0_10)jmsSession).isQueueExist(dest, false)); + (AMQSession)jmsSession).isQueueExist(dest, false)); } /** @@ -1206,11 +1223,11 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase m.setJMSReplyTo(replyToDest); prod.send(m); - Message msg = cons.receive(); + Message msg = cons.receive(5000l); MessageProducer prodR = session.createProducer(msg.getJMSReplyTo()); prodR.send(session.createTextMessage("x")); - Message m1 = replyToCons.receive(); + Message m1 = replyToCons.receive(5000l); assertNotNull("The reply to consumer should have received the messsage",m1); } @@ -1422,7 +1439,7 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase AMQDestination dest = (AMQDestination)jmsSession.createQueue(addr); MessageConsumer cons = jmsSession.createConsumer(dest); - AMQSession_0_10 ssn = (AMQSession_0_10)jmsSession; + AMQSession ssn = (AMQSession)jmsSession; assertTrue("Queue not created as expected",ssn.isQueueExist(dest, true)); assertTrue("Queue not bound as expected",ssn.isQueueBound("amq.direct","my-queue","test", null)); @@ -1454,11 +1471,11 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase String verifyAddr = "ADDR:my-queue;{ node: {durable:true, " + xDeclareArgs + "}}"; AMQDestination verifyDest = (AMQDestination)ssn.createQueue(verifyAddr); - ((AMQSession_0_10)ssn).isQueueExist(verifyDest, true); + ((AMQSession)ssn).isQueueExist(verifyDest, true); // Verify that the producer does not delete the subscription queue. MessageProducer prod = ssn.createProducer(dest); prod.close(); - ((AMQSession_0_10)ssn).isQueueExist(verifyDest, true); + ((AMQSession)ssn).isQueueExist(verifyDest, true); } } diff --git a/qpid/java/test-profiles/JavaPre010Excludes b/qpid/java/test-profiles/JavaPre010Excludes index b2f96ca279..e89753bef3 100644 --- a/qpid/java/test-profiles/JavaPre010Excludes +++ b/qpid/java/test-profiles/JavaPre010Excludes @@ -25,9 +25,17 @@ org.apache.qpid.test.client.message.JMSDestinationTest#testReceiveResend org.apache.qpid.server.message.MessageProtocolConversionTest#* +//QPID-3422: test fails because ring queue is not implemented on java broker +org.apache.qpid.test.client.destination.AddressBasedDestinationTest#testBrowseMode +//QPID-3392: the Java broker does not yet implement exchange creation arguments +org.apache.qpid.test.client.destination.AddressBasedDestinationTest#testCreateExchangeWithArgs +org.apache.qpid.test.client.destination.AddressBasedDestinationTest#testSessionCreateTopicWithExchangeArgs +//QPID-6037: the 0-9-1 client takes the view that if you don't specify the node type but you ask for a JMS Topic +// you want a topic behaviour. The 0-10 client thinks you must want a queue. +org.apache.qpid.test.client.destination.AddressBasedDestinationTest#testJMSTopicIsTreatedAsQueueIn0_10 + // The new addressing based syntax is not supported for AMQP 0-8/0-9 versions org.apache.qpid.client.failover.AddressBasedFailoverBehaviourTest#* -org.apache.qpid.test.client.destination.AddressBasedDestinationTest#* org.apache.qpid.test.client.queue.QueuePolicyTest#testRingPolicy org.apache.qpid.test.client.queue.QueuePolicyTest#testRejectPolicy org.apache.qpid.test.unit.message.JMSPropertiesTest#testApplicationProperties |
