From 626c1e679439bd42d4486bb16d5dfca39f99c147 Mon Sep 17 00:00:00 2001 From: Robert Godfrey Date: Sun, 24 Aug 2014 16:17:11 +0000 Subject: QPID-6037 : [Java Client] Enhance experimental support for ADDR addressing to the 0-8/9/9-1 client git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1620147 13f79535-47bb-0310-9956-ffa450edef68 --- .../java/org/apache/qpid/client/AMQConnection.java | 5 +- .../java/org/apache/qpid/client/AMQSession.java | 4 + .../org/apache/qpid/client/AMQSession_0_10.java | 1 + .../org/apache/qpid/client/AMQSession_0_8.java | 280 +++++++++++++++++++-- .../apache/qpid/client/BasicMessageConsumer.java | 51 ++-- .../qpid/client/BasicMessageConsumer_0_10.java | 41 +-- .../qpid/client/BasicMessageConsumer_0_8.java | 22 +- .../apache/qpid/client/BasicMessageProducer.java | 30 ++- .../qpid/client/BasicMessageProducer_0_10.java | 31 +-- .../qpid/client/BasicMessageProducer_0_8.java | 43 +++- .../apache/qpid/client/util/BlockingWaiter.java | 2 +- 11 files changed, 399 insertions(+), 111 deletions(-) (limited to 'qpid/java/client/src') diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index b64d355f80..2a91ff3ce2 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -87,6 +87,8 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect private static final Logger _logger = LoggerFactory.getLogger(AMQConnection.class); private static final AtomicLong CONN_NUMBER_GENERATOR = new AtomicLong(); + private static final long DEFAULT_CLOSE_TIMEOUT = 2000l; + private final long _connectionNumber; /** @@ -160,7 +162,6 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect /** Thread Pool for executing connection level processes. Such as returning bounced messages. */ private final ExecutorService _taskPool = Executors.newCachedThreadPool(); - private static final long DEFAULT_TIMEOUT = 1000 * 30; private AMQConnectionDelegate _delegate; @@ -873,7 +874,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect public void close() throws JMSException { - close(DEFAULT_TIMEOUT); + close(DEFAULT_CLOSE_TIMEOUT); } public void close(long timeout) throws JMSException diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 0b299a22cd..0183c30276 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -313,6 +313,10 @@ public abstract class AMQSession { private final BasicMessageConsumer[] _fastAccessConsumers = new BasicMessageConsumer[16]; diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index 46f999e452..68b7cf1f88 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -1462,6 +1462,7 @@ public class AMQSession_0_10 extends AMQSession { @@ -175,12 +177,49 @@ public class AMQSession_0_8 extends AMQSession bindings = new ArrayList(); + bindings.addAll(destination.getNode().getBindings()); + + String defaultExchange = destination.getAddressType() == AMQDestination.TOPIC_TYPE ? + destination.getAddressName(): "amq.topic"; + + for (AMQDestination.Binding binding: bindings) + { + // Currently there is a bug (QPID-3317) with setting up and tearing down x-bindings for link. + // The null check below is a way to side step that issue while fixing QPID-4146 + // Note this issue only affects producers. + if (binding.getQueue() == null && queueName == null) + { + continue; + } + String queue = binding.getQueue() == null? + queueName.asString(): binding.getQueue(); + + String exchange = binding.getExchange() == null ? + defaultExchange : + binding.getExchange(); + + _logger.debug("Binding queue : " + queue + + " exchange: " + exchange + + " using binding key " + binding.getBindingKey() + + " with args " + Strings.printMap(binding.getArgs())); + doBind(destination, binding, queue, exchange); + } + } } public void sendClose(long timeout) throws AMQException, FailoverException @@ -547,10 +586,8 @@ public class AMQSession_0_8 extends AMQSession bindingArguments = new HashMap(); bindingArguments.put(AMQPFilterTypes.JMS_SELECTOR.getValue().toString(), messageSelector == null ? "" : messageSelector); - bindQueue(AMQShortString.valueOf(queueName), - AMQShortString.valueOf(dest.getSubject()), - FieldTable.convertToFieldTable(bindingArguments), - AMQShortString.valueOf(dest.getAddressName()),dest,false); + final AMQDestination.Binding binding = new AMQDestination.Binding(dest.getAddressName(), queueName, dest.getSubject(), bindingArguments); + doBind(dest, binding, queueName, dest.getAddressName()); } @@ -589,6 +626,15 @@ public class AMQSession_0_8 extends AMQSession( new FailoverProtectedOperation() @@ -942,13 +1000,21 @@ public class AMQSession_0_8 extends AMQSession arguments = node.getDeclareArgs(); + + if(altExchange != null && !"".equals(altExchange)) + { + arguments.put("alternateExchange", altExchange); + } + // can't set alt. exchange declareExchange(AMQShortString.valueOf(dest.getAddressName()), AMQShortString.valueOf(node.getExchangeType()), false, node.isDurable(), node.isAutoDelete(), - FieldTable.convertToFieldTable(node.getDeclareArgs()), false); + FieldTable.convertToFieldTable(arguments), false); // If bindings are specified without a queue name and is called by the producer, // the broker will send an exception as expected. @@ -962,9 +1028,79 @@ public class AMQSession_0_8 extends AMQSession(new FailoverProtectedOperation() + { + public Object execute() throws AMQException, FailoverException + { + + + MethodRegistry methodRegistry = getProtocolHandler().getMethodRegistry(); + QueueBindBody queueBindBody = + methodRegistry.createQueueBindBody(getTicket(), + AMQShortString.valueOf(queue), + AMQShortString.valueOf(exchange), + AMQShortString.valueOf(bindingKey), + false, + FieldTable.convertToFieldTable(binding.getArgs())); + + getProtocolHandler().syncWrite(queueBindBody. + generateFrame(getChannelId()), QueueBindOkBody.class); + return null; + } + }, getAMQConnection()).execute(); + + } + + + protected void doUnbind(final AMQDestination.Binding binding, + final String queue, + final String exchange) throws AMQException + { + new FailoverNoopSupport(new FailoverProtectedOperation() + { + public Object execute() throws AMQException, FailoverException + { + + if (isBound(null, AMQShortString.valueOf(queue), null)) + { + MethodRegistry methodRegistry = getProtocolHandler().getMethodRegistry(); + AMQMethodBody body; + if (methodRegistry instanceof MethodRegistry_0_9) + { + String bindingKey = binding.getBindingKey() == null ? queue : binding.getBindingKey(); + + MethodRegistry_0_9 methodRegistry_0_9 = (MethodRegistry_0_9) methodRegistry; + body = methodRegistry_0_9.createQueueUnbindBody(getTicket(), + AMQShortString.valueOf(queue), + AMQShortString.valueOf(exchange), + AMQShortString.valueOf(bindingKey), + null); + } + else if (methodRegistry instanceof MethodRegistry_0_91) + { + MethodRegistry_0_91 methodRegistry_0_91 = (MethodRegistry_0_91) methodRegistry; + body = methodRegistry_0_91.createQueueUnbindBody(getTicket(), + AMQShortString.valueOf(queue), + AMQShortString.valueOf(exchange), + AMQShortString.valueOf(binding.getBindingKey()), + null); + + } + else + { + throw new AMQException(AMQConstant.NOT_IMPLEMENTED, "Cannot unbind a queue in AMQP 0-8"); + } + getProtocolHandler().syncWrite(body.generateFrame(getChannelId()), QueueUnbindOkBody.class); + return null; + } + else + { + return null; + } + } + }, getAMQConnection()).execute(); } public boolean isQueueExist(AMQDestination dest, boolean assertNode) throws AMQException @@ -1057,6 +1193,102 @@ public class AMQSession_0_8 extends AMQSession(new FailoverProtectedOperation() + { + public Object execute() throws AMQException, FailoverException + { + sendExchangeDelete(dest.getAddressName()); + return null; + } + }, getAMQConnection()).execute(); + dest.setAddressResolved(0); + } + } + else + { + if (isQueueExist(dest,false)) + { + + new FailoverNoopSupport(new FailoverProtectedOperation() + { + public Object execute() throws AMQException, FailoverException + { + sendQueueDelete(AMQShortString.valueOf(dest.getAddressName())); + return null; + } + }, getAMQConnection()).execute(); + dest.setAddressResolved(0); + } + } + } + + @Override + void handleLinkDelete(AMQDestination dest) throws AMQException + { + // We need to destroy link bindings + String defaultExchangeForBinding = dest.getAddressType() == AMQDestination.TOPIC_TYPE ? dest + .getAddressName() : "amq.topic"; + + String defaultQueueName = null; + if (AMQDestination.QUEUE_TYPE == dest.getAddressType()) + { + defaultQueueName = dest.getQueueName(); + } + else + { + defaultQueueName = dest.getLink().getName() != null ? dest.getLink().getName() : dest.getQueueName(); + } + + for (AMQDestination.Binding binding: dest.getLink().getBindings()) + { + String queue = binding.getQueue() == null? + defaultQueueName: binding.getQueue(); + + String exchange = binding.getExchange() == null ? + defaultExchangeForBinding : + binding.getExchange(); + + if (_logger.isDebugEnabled()) + { + _logger.debug("Unbinding queue : " + queue + + " exchange: " + exchange + + " using binding key " + binding.getBindingKey() + + " with args " + Strings.printMap(binding.getArgs())); + } + doUnbind(binding, queue, exchange); + } + } + + + void deleteSubscriptionQueue(final AMQDestination dest) throws AMQException + { + // We need to delete the subscription queue. + if (dest.getAddressType() == AMQDestination.TOPIC_TYPE && + dest.getLink().getSubscriptionQueue().isExclusive() && + isQueueExist(dest.getQueueName(), false, false, false, false, null)) + { + (new FailoverNoopSupport( + new FailoverProtectedOperation() + { + public Void execute() throws AMQException, FailoverException + { + + sendQueueDelete(AMQShortString.valueOf(dest.getQueueName())); + return null; + } + }, getAMQConnection())).execute(); + + } + } + protected void flushAcknowledgments() { diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index 01e89b78c1..187be8522c 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -20,19 +20,35 @@ */ package org.apache.qpid.client; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import javax.jms.InvalidSelectorException; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageListener; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.qpid.AMQException; import org.apache.qpid.AMQInternalException; import org.apache.qpid.client.failover.FailoverException; +import org.apache.qpid.client.filter.JMSSelectorFilter; import org.apache.qpid.client.filter.MessageFilter; import org.apache.qpid.client.message.AMQMessageDelegateFactory; import org.apache.qpid.client.message.AbstractJMSMessage; import org.apache.qpid.client.message.CloseConsumerMessage; import org.apache.qpid.client.message.MessageFactoryRegistry; import org.apache.qpid.common.AMQPFilterTypes; -import org.apache.qpid.client.filter.JMSSelectorFilter; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.FieldTableFactory; @@ -40,21 +56,6 @@ import org.apache.qpid.jms.MessageConsumer; import org.apache.qpid.jms.Session; import org.apache.qpid.transport.TransportException; -import javax.jms.InvalidSelectorException; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageListener; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Iterator; -import java.util.List; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; - public abstract class BasicMessageConsumer extends Closeable implements MessageConsumer { private static final Logger _logger = LoggerFactory.getLogger(BasicMessageConsumer.class); @@ -376,7 +377,23 @@ public abstract class BasicMessageConsumer extends Closeable implements Messa */ public boolean isExclusive() { - return _exclusive; + + AMQDestination dest = this.getDestination(); + if (dest.getDestSyntax() == AMQDestination.DestSyntax.ADDR) + { + if (dest.getAddressType() == AMQDestination.TOPIC_TYPE) + { + return true; + } + else + { + return dest.getLink().getSubscription().isExclusive(); + } + } + else + { + return _exclusive; + } } public boolean isReceiving() diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java index 658fb25ce4..8f91a7db08 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java @@ -17,12 +17,18 @@ */ package org.apache.qpid.client; +import java.util.Iterator; +import java.util.concurrent.atomic.AtomicBoolean; + +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageListener; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.qpid.AMQException; import org.apache.qpid.client.AMQDestination.AddressOption; -import org.apache.qpid.client.failover.FailoverException; import org.apache.qpid.client.message.AMQMessageDelegateFactory; import org.apache.qpid.client.message.AMQMessageDelegate_0_10; import org.apache.qpid.client.message.AbstractJMSMessage; @@ -41,12 +47,6 @@ import org.apache.qpid.transport.RangeSetFactory; import org.apache.qpid.transport.SessionException; import org.apache.qpid.transport.TransportException; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageListener; -import java.util.Iterator; -import java.util.concurrent.atomic.AtomicBoolean; - /** * This is a 0.10 message consumer. */ @@ -480,26 +480,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer { _waiting.set(true); - while (!_ready) + while (!_ready && _error == null) { try { -- cgit v1.2.1