diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2013-06-01 19:24:36 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2013-06-01 19:24:36 +0000 |
| commit | 1f92ceb67b4c717d425cad75c5ecde8e08f7874e (patch) | |
| tree | e0b5a10c8c765bc52d5abc4c1674dc9da61cdffe /java/broker | |
| parent | 690b61e5fe6cb9ee60406eacffb7584c1f3a1a83 (diff) | |
| download | qpid-python-1f92ceb67b4c717d425cad75c5ecde8e08f7874e.tar.gz | |
QPID-4897 : [Java Broker] Allow selectors on bindings fro non-topic exchanges
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1488561 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker')
14 files changed, 944 insertions, 1218 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java index 0d05307cb4..58c2b33041 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java @@ -26,6 +26,7 @@ import org.apache.qpid.AMQException; import org.apache.qpid.AMQInternalException; import org.apache.qpid.AMQSecurityException; import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.binding.Binding; import org.apache.qpid.server.logging.LogSubject; import org.apache.qpid.server.logging.actors.CurrentActor; @@ -173,20 +174,106 @@ public abstract class AbstractExchange implements Exchange return getVirtualHost().getQueueRegistry(); } - public boolean isBound(String bindingKey, Map<String,Object> arguments, AMQQueue queue) + public final boolean isBound(AMQShortString routingKey, FieldTable ft, AMQQueue queue) { - return isBound(new AMQShortString(bindingKey), queue); + return isBound(routingKey == null ? "" : routingKey.asString(), FieldTable.convertToMap(ft), queue); } + public final boolean isBound(String bindingKey, Map<String,Object> arguments, AMQQueue queue) + { + for(Binding b : _bindings) + { + if(bindingKey.equals(b.getBindingKey()) && queue == b.getQueue()) + { + return (b.getArguments() == null || b.getArguments().isEmpty()) + ? (arguments == null || arguments.isEmpty()) + : b.getArguments().equals(arguments); + } + } + return false; + } + + public final boolean isBound(AMQShortString routingKey, AMQQueue queue) + { + return isBound(routingKey==null ? "" : routingKey.asString(), queue); + } + + public final boolean isBound(String bindingKey, AMQQueue queue) + { + for(Binding b : _bindings) + { + if(bindingKey.equals(b.getBindingKey()) && queue == b.getQueue()) + { + return true; + } + } + return false; + } + + public final boolean isBound(AMQShortString routingKey) + { + return isBound(routingKey == null ? "" : routingKey.asString()); + } + + public final boolean isBound(String bindingKey) + { + for(Binding b : _bindings) + { + if(bindingKey.equals(b.getBindingKey())) + { + return true; + } + } + return false; + } + + public final boolean isBound(AMQQueue queue) + { + for(Binding b : _bindings) + { + if(queue == b.getQueue()) + { + return true; + } + } + return false; + } - public boolean isBound(String bindingKey, AMQQueue queue) + @Override + public final boolean isBound(Map<String, Object> arguments, AMQQueue queue) { - return isBound(new AMQShortString(bindingKey), queue); + for(Binding b : _bindings) + { + if(queue == b.getQueue() && + ((b.getArguments() == null || b.getArguments().isEmpty()) + ? (arguments == null || arguments.isEmpty()) + : b.getArguments().equals(arguments))) + { + return true; + } + } + return false; + } + + @Override + public final boolean isBound(String bindingKey, Map<String, Object> arguments) + { + for(Binding b : _bindings) + { + if(b.getBindingKey().equals(bindingKey) && + ((b.getArguments() == null || b.getArguments().isEmpty()) + ? (arguments == null || arguments.isEmpty()) + : b.getArguments().equals(arguments))) + { + return true; + } + } + return false; } - public boolean isBound(String bindingKey) + public final boolean hasBindings() { - return isBound(new AMQShortString(bindingKey)); + return !_bindings.isEmpty(); } public Exchange getAlternateExchange() diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java index 4e136965a1..ccf955ed1c 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java @@ -272,6 +272,18 @@ public class DefaultExchange implements Exchange } @Override + public boolean isBound(Map<String, Object> arguments, AMQQueue queue) + { + return (arguments == null || arguments.isEmpty()) && isBound(queue); + } + + @Override + public boolean isBound(String bindingKey, Map<String, Object> arguments) + { + return (arguments == null || arguments.isEmpty()) && isBound(bindingKey); + } + + @Override public boolean isBound(String bindingKey) { return _virtualHost.getQueueRegistry().getQueue(bindingKey) != null; diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java index fc6ce15bc4..2e2a93d638 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java @@ -20,9 +20,18 @@ */ package org.apache.qpid.server.exchange; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import org.apache.log4j.Logger; +import org.apache.qpid.AMQInvalidArgumentException; +import org.apache.qpid.common.AMQPFilterTypes; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.binding.Binding; +import org.apache.qpid.server.filter.JMSSelectorFilter; +import org.apache.qpid.server.filter.MessageFilter; import org.apache.qpid.server.message.InboundMessage; import org.apache.qpid.server.plugin.ExchangeType; import org.apache.qpid.server.queue.AMQQueue; @@ -36,10 +45,14 @@ import java.util.concurrent.CopyOnWriteArraySet; public class DirectExchange extends AbstractExchange { + + private static final Logger _logger = Logger.getLogger(DirectExchange.class); + private static final class BindingSet { private CopyOnWriteArraySet<Binding> _bindings = new CopyOnWriteArraySet<Binding>(); - private List<BaseQueue> _queues = new ArrayList<BaseQueue>(); + private List<BaseQueue> _unfilteredQueues = new ArrayList<BaseQueue>(); + private Map<BaseQueue, MessageFilter> _filteredQueues = new HashMap<BaseQueue, MessageFilter>(); public synchronized void addBinding(Binding binding) { @@ -56,27 +69,59 @@ public class DirectExchange extends AbstractExchange private void recalculateQueues() { List<BaseQueue> queues = new ArrayList<BaseQueue>(_bindings.size()); + Map<BaseQueue, MessageFilter> filteredQueues = new HashMap<BaseQueue,MessageFilter>(); for(Binding b : _bindings) { - if(!queues.contains(b.getQueue())) + + if(FilterSupport.argumentsContainFilter(b.getArguments())) { - queues.add(b.getQueue()); + try + { + MessageFilter filter = FilterSupport.createMessageFilter(b.getArguments(), b.getQueue()); + filteredQueues.put(b.getQueue(),filter); + } + catch (AMQInvalidArgumentException e) + { + _logger.warn("Binding ignored: cannot parse filter on binding of queue '"+b.getQueue().getName() + + "' to exchange '" + b.getExchange().getName() + + "' with arguments: " + b.getArguments(), e); + } + + } + else + { + + if(!queues.contains(b.getQueue())) + { + queues.add(b.getQueue()); + } } } - _queues = queues; + _unfilteredQueues = queues; + _filteredQueues = filteredQueues; } - public List<BaseQueue> getQueues() + public List<BaseQueue> getUnfilteredQueues() { - return _queues; + return _unfilteredQueues; } public CopyOnWriteArraySet<Binding> getBindings() { return _bindings; } + + public boolean hasFilteredQueues() + { + return !_filteredQueues.isEmpty(); + } + + public Map<BaseQueue,MessageFilter> getFilteredQueues() + { + return _filteredQueues; + } } private final ConcurrentHashMap<String, BindingSet> _bindingsByKey = @@ -98,7 +143,30 @@ public class DirectExchange extends AbstractExchange if(bindings != null) { - return bindings.getQueues(); + List<BaseQueue> queues = bindings.getUnfilteredQueues(); + + if(bindings.hasFilteredQueues()) + { + Set<BaseQueue> queuesSet = new HashSet<BaseQueue>(queues); + + Map<BaseQueue, MessageFilter> filteredQueues = bindings.getFilteredQueues(); + for(Map.Entry<BaseQueue, MessageFilter> entry : filteredQueues.entrySet()) + { + if(!queuesSet.contains(entry.getKey())) + { + MessageFilter filter = entry.getValue(); + if(filter.matches(payload)) + { + queuesSet.add(entry.getKey()); + } + } + } + if(queues.size() != queuesSet.size()) + { + queues = new ArrayList<BaseQueue>(queuesSet); + } + } + return queues; } else { @@ -106,50 +174,6 @@ public class DirectExchange extends AbstractExchange } - - } - - public boolean isBound(AMQShortString routingKey, FieldTable arguments, AMQQueue queue) - { - return isBound(routingKey,queue); - } - - public boolean isBound(AMQShortString routingKey, AMQQueue queue) - { - String bindingKey = (routingKey == null) ? "" : routingKey.toString(); - BindingSet bindings = _bindingsByKey.get(bindingKey); - if(bindings != null) - { - return bindings.getQueues().contains(queue); - } - return false; - - } - - public boolean isBound(AMQShortString routingKey) - { - String bindingKey = (routingKey == null) ? "" : routingKey.toString(); - BindingSet bindings = _bindingsByKey.get(bindingKey); - return bindings != null && !bindings.getQueues().isEmpty(); - } - - public boolean isBound(AMQQueue queue) - { - - for (BindingSet bindings : _bindingsByKey.values()) - { - if(bindings.getQueues().contains(queue)) - { - return true; - } - - } - return false; - } - - public boolean hasBindings() - { - return !getBindings().isEmpty(); } protected void onBind(final Binding binding) @@ -189,5 +213,4 @@ public class DirectExchange extends AbstractExchange } - } diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java index a5a1d7f912..d483c3b29b 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java @@ -145,12 +145,15 @@ public interface Exchange extends ExchangeReferrer Collection<Binding> getBindings(); + boolean isBound(String bindingKey); boolean isBound(String bindingKey, AMQQueue queue); - public boolean isBound(String bindingKey, Map<String,Object> arguments, AMQQueue queue); + boolean isBound(String bindingKey, Map<String,Object> arguments, AMQQueue queue); - boolean isBound(String bindingKey); + boolean isBound(Map<String, Object> arguments, AMQQueue queue); + + boolean isBound(String bindingKey, Map<String, Object> arguments); void removeReference(ExchangeReferrer exchange); @@ -158,6 +161,8 @@ public interface Exchange extends ExchangeReferrer boolean hasReferrers(); + + public interface BindingListener { void bindingAdded(Exchange exchange, Binding binding); diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java index 6ad5eb261e..cd830d69a9 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java @@ -20,11 +20,21 @@ */ package org.apache.qpid.server.exchange; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicReference; import org.apache.log4j.Logger; +import org.apache.qpid.AMQInvalidArgumentException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.binding.Binding; +import org.apache.qpid.server.filter.MessageFilter; import org.apache.qpid.server.message.InboundMessage; import org.apache.qpid.server.plugin.ExchangeType; import org.apache.qpid.server.queue.AMQQueue; @@ -42,7 +52,18 @@ public class FanoutExchange extends AbstractExchange /** * Maps from queue name to queue instances */ - private final ConcurrentHashMap<AMQQueue,Integer> _queues = new ConcurrentHashMap<AMQQueue,Integer>(); + private final Map<AMQQueue,Integer> _queues = new HashMap<AMQQueue,Integer>(); + private final CopyOnWriteArrayList<AMQQueue> _unfilteredQueues = new CopyOnWriteArrayList<AMQQueue>(); + private final CopyOnWriteArrayList<AMQQueue> _filteredQueues = new CopyOnWriteArrayList<AMQQueue>(); + + private final AtomicReference<Map<AMQQueue,Map<Binding, MessageFilter>>> _filteredBindings = + new AtomicReference<Map<AMQQueue,Map<Binding, MessageFilter>>>(); + { + Map<AMQQueue,Map<Binding, MessageFilter>> emptyMap = Collections.emptyMap(); + _filteredBindings.set(emptyMap); + } + + public static final ExchangeType<FanoutExchange> TYPE = new FanoutExchangeType(); @@ -54,115 +75,150 @@ public class FanoutExchange extends AbstractExchange public ArrayList<BaseQueue> doRoute(InboundMessage payload) { - - if (_logger.isDebugEnabled()) - { - _logger.debug("Publishing message to queue " + _queues); - } - for(Binding b : getBindings()) { b.incrementMatches(); } - return new ArrayList<BaseQueue>(_queues.keySet()); - - } + final ArrayList<BaseQueue> result = new ArrayList<BaseQueue>(_unfilteredQueues); - public boolean isBound(AMQShortString routingKey, FieldTable arguments, AMQQueue queue) - { - return isBound(routingKey, queue); - } - public boolean isBound(AMQShortString routingKey, AMQQueue queue) - { - return isBound(queue); - } + final Map<AMQQueue, Map<Binding, MessageFilter>> filteredBindings = _filteredBindings.get(); + if(!_filteredQueues.isEmpty()) + { + for(AMQQueue q : _filteredQueues) + { + final Map<Binding, MessageFilter> bindingMessageFilterMap = filteredBindings.get(q); + if(!(bindingMessageFilterMap == null || result.contains(q))) + { + for(MessageFilter filter : bindingMessageFilterMap.values()) + { + if(filter.matches(payload)) + { + result.add(q); + break; + } + } + } + } - public boolean isBound(AMQShortString routingKey) - { + } - return (_queues != null) && !_queues.isEmpty(); - } - public boolean isBound(AMQQueue queue) - { - if (queue == null) + if (_logger.isDebugEnabled()) { - return false; + _logger.debug("Publishing message to queue " + result); } - return _queues.containsKey(queue); - } - public boolean hasBindings() - { - return !_queues.isEmpty(); + return result; + } - protected void onBind(final Binding binding) + + protected synchronized void onBind(final Binding binding) { AMQQueue queue = binding.getQueue(); assert queue != null; + if(binding.getArguments() == null || binding.getArguments().isEmpty() || !FilterSupport.argumentsContainFilter(binding.getArguments())) + { - Integer oldVal; + Integer oldVal; + if(_queues.containsKey(queue)) + { + _queues.put(queue,_queues.get(queue)+1); + } + else + { + _queues.put(queue, ONE); + _unfilteredQueues.add(queue); + // No longer any reason to check filters for this queue + _filteredQueues.remove(queue); + } - if((oldVal = _queues.putIfAbsent(queue, ONE)) != null) + } + else { - Integer newVal = oldVal+1; - while(!_queues.replace(queue, oldVal, newVal)) + try { - oldVal = _queues.get(queue); - if(oldVal == null) + + HashMap<AMQQueue,Map<Binding, MessageFilter>> filteredBindings = + new HashMap<AMQQueue,Map<Binding, MessageFilter>>(_filteredBindings.get()); + + Map<Binding, MessageFilter> bindingsForQueue = filteredBindings.remove(binding.getQueue()); + final + MessageFilter messageFilter = + FilterSupport.createMessageFilter(binding.getArguments(), binding.getQueue()); + + if(bindingsForQueue != null) { - oldVal = _queues.putIfAbsent(queue, ONE); - if(oldVal == null) + bindingsForQueue = new HashMap<Binding,MessageFilter>(bindingsForQueue); + bindingsForQueue.put(binding, messageFilter); + } + else + { + bindingsForQueue = Collections.singletonMap(binding, messageFilter); + if(!_unfilteredQueues.contains(queue)) { - break; + _filteredQueues.add(queue); } } - newVal = oldVal + 1; + + filteredBindings.put(binding.getQueue(), bindingsForQueue); + + _filteredBindings.set(filteredBindings); + + } + catch (AMQInvalidArgumentException e) + { + _logger.warn("Cannoy bind queue " + queue + " to exchange this " + this + " beacuse selector cannot be parsed.", e); + return; } } - if (_logger.isDebugEnabled()) { _logger.debug("Binding queue " + queue - + " with routing key " + new AMQShortString(binding.getBindingKey()) + " to exchange " + this); + + " with routing key " + binding.getBindingKey() + " to exchange " + this); } } - protected void onUnbind(final Binding binding) + protected synchronized void onUnbind(final Binding binding) { AMQQueue queue = binding.getQueue(); - Integer oldValue = _queues.get(queue); - - boolean done = false; - - while(!(done || oldValue == null)) + if(binding.getArguments() == null || binding.getArguments().isEmpty() || !FilterSupport.argumentsContainFilter(binding.getArguments())) { - while(!(done || oldValue == null) && oldValue.intValue() == 1) + Integer oldValue = _queues.remove(queue); + if(ONE.equals(oldValue)) { - if(!_queues.remove(queue, oldValue)) + // should start checking filters for this queue + if(_filteredBindings.get().containsKey(queue)) { - oldValue = _queues.get(queue); - } - else - { - done = true; + _filteredQueues.add(queue); } + _unfilteredQueues.remove(queue); } - while(!(done || oldValue == null) && oldValue.intValue() != 1) + else { - Integer newValue = oldValue - 1; - if(!_queues.replace(queue, oldValue, newValue)) - { - oldValue = _queues.get(queue); - } - else - { - done = true; - } + _queues.put(queue,oldValue-1); } } + else // we are removing a binding with filters + { + HashMap<AMQQueue,Map<Binding, MessageFilter>> filteredBindings = + new HashMap<AMQQueue,Map<Binding, MessageFilter>>(_filteredBindings.get()); + + Map<Binding,MessageFilter> bindingsForQueue = filteredBindings.remove(binding.getQueue()); + if(bindingsForQueue.size()>1) + { + bindingsForQueue = new HashMap<Binding,MessageFilter>(bindingsForQueue); + bindingsForQueue.remove(binding); + filteredBindings.put(binding.getQueue(),bindingsForQueue); + } + else + { + _filteredQueues.remove(queue); + } + _filteredBindings.set(filteredBindings); + + } } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/FilterSupport.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/FilterSupport.java new file mode 100644 index 0000000000..880c9a2cf6 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/FilterSupport.java @@ -0,0 +1,258 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.server.exchange; + +import java.lang.ref.WeakReference; +import java.util.Collections; +import java.util.Map; +import java.util.WeakHashMap; +import org.apache.qpid.AMQInvalidArgumentException; +import org.apache.qpid.common.AMQPFilterTypes; +import org.apache.qpid.filter.SelectorParsingException; +import org.apache.qpid.filter.selector.ParseException; +import org.apache.qpid.filter.selector.TokenMgrError; +import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.server.filter.JMSSelectorFilter; +import org.apache.qpid.server.filter.MessageFilter; +import org.apache.qpid.server.message.InboundMessage; +import org.apache.qpid.server.protocol.AMQSessionModel; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.Filterable; + +public class FilterSupport +{ + private static final Map<String, WeakReference<JMSSelectorFilter>> _selectorCache = + Collections.synchronizedMap(new WeakHashMap<String, WeakReference<JMSSelectorFilter>>()); + + static MessageFilter createJMSSelectorFilter(FieldTable args) throws AMQInvalidArgumentException + { + final String selectorString = args.getString(AMQPFilterTypes.JMS_SELECTOR.getValue()); + return getMessageFilter(selectorString); + } + + + static MessageFilter createJMSSelectorFilter(Map<String, Object> args) throws AMQInvalidArgumentException + { + final String selectorString = (String) args.get(AMQPFilterTypes.JMS_SELECTOR.getValue()); + return getMessageFilter(selectorString); + } + + + private static MessageFilter getMessageFilter(String selectorString) throws AMQInvalidArgumentException + { + WeakReference<JMSSelectorFilter> selectorRef = _selectorCache.get(selectorString); + JMSSelectorFilter selector = null; + + if(selectorRef == null || (selector = selectorRef.get())==null) + { + try + { + selector = new JMSSelectorFilter(selectorString); + } + catch (ParseException e) + { + throw new AMQInvalidArgumentException("Cannot parse JMS selector \"" + selectorString + "\"", e); + } + catch (SelectorParsingException e) + { + throw new AMQInvalidArgumentException("Cannot parse JMS selector \"" + selectorString + "\"", e); + } + catch (TokenMgrError e) + { + throw new AMQInvalidArgumentException("Cannot parse JMS selector \"" + selectorString + "\"", e); + } + _selectorCache.put(selectorString, new WeakReference<JMSSelectorFilter>(selector)); + } + return selector; + } + + static boolean argumentsContainFilter(final FieldTable args) + { + return argumentsContainNoLocal(args) || argumentsContainJMSSelector(args); + } + + + static boolean argumentsContainFilter(final Map<String, Object> args) + { + return argumentsContainNoLocal(args) || argumentsContainJMSSelector(args); + } + + + static boolean argumentsContainNoLocal(final Map<String, Object> args) + { + return args != null + && args.containsKey(AMQPFilterTypes.NO_LOCAL.toString()) + && Boolean.TRUE.equals(args.get(AMQPFilterTypes.NO_LOCAL.toString())); + } + + + static boolean argumentsContainNoLocal(final FieldTable args) + { + return args != null + && args.containsKey(AMQPFilterTypes.NO_LOCAL.getValue()) + && Boolean.TRUE.equals(args.get(AMQPFilterTypes.NO_LOCAL.getValue())); + } + + + static boolean argumentsContainJMSSelector(final Map<String,Object> args) + { + return args != null && (args.get(AMQPFilterTypes.JMS_SELECTOR.toString()) instanceof String) + && ((String)args.get(AMQPFilterTypes.JMS_SELECTOR.toString())).trim().length() != 0; + } + + + static boolean argumentsContainJMSSelector(final FieldTable args) + { + return args != null && (args.containsKey(AMQPFilterTypes.JMS_SELECTOR.getValue()) + && args.getString(AMQPFilterTypes.JMS_SELECTOR.getValue()).trim().length() != 0); + } + + + static MessageFilter createMessageFilter(final Map<String,Object> args, AMQQueue queue) throws AMQInvalidArgumentException + { + if(argumentsContainNoLocal(args)) + { + MessageFilter filter = new NoLocalFilter(queue); + + if(argumentsContainJMSSelector(args)) + { + filter = new CompoundFilter(filter, createJMSSelectorFilter(args)); + } + return filter; + } + else + { + return createJMSSelectorFilter(args); + } + } + + static MessageFilter createMessageFilter(final FieldTable args, AMQQueue queue) throws AMQInvalidArgumentException + { + if(argumentsContainNoLocal(args)) + { + MessageFilter filter = new NoLocalFilter(queue); + + if(argumentsContainJMSSelector(args)) + { + filter = new CompoundFilter(filter, createJMSSelectorFilter(args)); + } + return filter; + } + else + { + return createJMSSelectorFilter(args); + } + } + + static final class NoLocalFilter implements MessageFilter + { + private final AMQQueue _queue; + + public NoLocalFilter(AMQQueue queue) + { + _queue = queue; + } + + public boolean matches(Filterable message) + { + InboundMessage inbound = (InboundMessage) message; + final AMQSessionModel exclusiveOwningSession = _queue.getExclusiveOwningSession(); + return exclusiveOwningSession == null || !exclusiveOwningSession.onSameConnection(inbound); + + } + + @Override + public boolean equals(Object o) + { + if (this == o) + { + return true; + } + + if (o == null || getClass() != o.getClass()) + { + return false; + } + + NoLocalFilter that = (NoLocalFilter) o; + + return _queue == null ? that._queue == null : _queue.equals(that._queue); + } + + @Override + public int hashCode() + { + return _queue != null ? _queue.hashCode() : 0; + } + } + + static final class CompoundFilter implements MessageFilter + { + private MessageFilter _noLocalFilter; + private MessageFilter _jmsSelectorFilter; + + public CompoundFilter(MessageFilter filter, MessageFilter jmsSelectorFilter) + { + _noLocalFilter = filter; + _jmsSelectorFilter = jmsSelectorFilter; + } + + public boolean matches(Filterable message) + { + return _noLocalFilter.matches(message) && _jmsSelectorFilter.matches(message); + } + + @Override + public boolean equals(Object o) + { + if (this == o) + { + return true; + } + if (o == null || getClass() != o.getClass()) + { + return false; + } + + CompoundFilter that = (CompoundFilter) o; + + if (_jmsSelectorFilter != null ? !_jmsSelectorFilter.equals(that._jmsSelectorFilter) : that._jmsSelectorFilter != null) + { + return false; + } + if (_noLocalFilter != null ? !_noLocalFilter.equals(that._noLocalFilter) : that._noLocalFilter != null) + { + return false; + } + + return true; + } + + @Override + public int hashCode() + { + int result = _noLocalFilter != null ? _noLocalFilter.hashCode() : 0; + result = 31 * result + (_jmsSelectorFilter != null ? _jmsSelectorFilter.hashCode() : 0); + return result; + } + } +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java index b6f5f973f4..eb4a84a5b9 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java @@ -22,15 +22,19 @@ package org.apache.qpid.server.exchange; import org.apache.log4j.Logger; +import org.apache.qpid.AMQInvalidArgumentException; import org.apache.qpid.framing.AMQTypedValue; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.binding.Binding; +import org.apache.qpid.server.filter.MessageFilter; import org.apache.qpid.server.message.AMQMessageHeader; import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Set; +import org.apache.qpid.server.message.InboundMessage; +import org.apache.qpid.server.queue.Filterable; /** * Defines binding and matching based on a set of headers. @@ -44,13 +48,14 @@ class HeadersBinding private final Set<String> required = new HashSet<String>(); private final Map<String,Object> matches = new HashMap<String,Object>(); private boolean matchAny; + private MessageFilter _filter; /** * Creates a header binding for a set of mappings. Those mappings whose value is * null or the empty string are assumed only to be required headers, with * no constraint on the value. Those with a non-null value are assumed to * define a required match of value. - * + * * @param binding the binding to create a header binding using */ public HeadersBinding(Binding binding) @@ -66,9 +71,30 @@ class HeadersBinding _mappings = null; } } - + private void initMappings() { + if(FilterSupport.argumentsContainFilter(_mappings)) + { + try + { + _filter = FilterSupport.createMessageFilter(_mappings,_binding.getQueue()); + } + catch (AMQInvalidArgumentException e) + { + _logger.warn("Invalid filter in binding queue '"+_binding.getQueue().getName() + +"' to exchange '"+_binding.getExchange().getName() + +"' with arguments: " + _binding.getArguments()); + _filter = new MessageFilter() + { + @Override + public boolean matches(Filterable message) + { + return false; + } + }; + } + } for(Map.Entry<String, Object> entry : _mappings.entrySet()) { String propertyName = entry.getKey(); @@ -87,7 +113,7 @@ class HeadersBinding } } } - + public Binding getBinding() { return _binding; @@ -111,6 +137,11 @@ class HeadersBinding } } + public boolean matches(InboundMessage message) + { + return matches(message.getMessageHeader()) && (_filter == null || _filter.matches(message)); + } + private boolean and(AMQMessageHeader headers) { if(headers.containsHeaders(required)) @@ -215,7 +246,7 @@ class HeadersBinding { return key.startsWith("X-") || key.startsWith("x-"); } - + @Override public boolean equals(final Object o) { @@ -250,4 +281,4 @@ class HeadersBinding return true; } -}
\ No newline at end of file +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java index 746c8ac6bc..9fb745d553 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java @@ -69,14 +69,14 @@ public class HeadersExchange extends AbstractExchange { private static final Logger _logger = Logger.getLogger(HeadersExchange.class); - + private final ConcurrentHashMap<String, CopyOnWriteArraySet<Binding>> _bindingsByKey = new ConcurrentHashMap<String, CopyOnWriteArraySet<Binding>>(); - + private final CopyOnWriteArrayList<HeadersBinding> _bindingHeaderMatchers = new CopyOnWriteArrayList<HeadersBinding>(); - + public static final ExchangeType<HeadersExchange> TYPE = new HeadersExchangeType(); public HeadersExchange() @@ -87,112 +87,31 @@ public class HeadersExchange extends AbstractExchange public ArrayList<BaseQueue> doRoute(InboundMessage payload) { - AMQMessageHeader header = payload.getMessageHeader(); if (_logger.isDebugEnabled()) { - _logger.debug("Exchange " + getNameShortString() + ": routing message with headers " + header); + _logger.debug("Exchange " + getNameShortString() + ": routing message with headers " + payload.getMessageHeader()); } - + LinkedHashSet<BaseQueue> queues = new LinkedHashSet<BaseQueue>(); - + for (HeadersBinding hb : _bindingHeaderMatchers) { - if (hb.matches(header)) + if (hb.matches(payload)) { Binding b = hb.getBinding(); - + b.incrementMatches(); - + if (_logger.isDebugEnabled()) { _logger.debug("Exchange " + getNameShortString() + ": delivering message with headers " + - header + " to " + b.getQueue().getNameShortString()); + payload.getMessageHeader() + " to " + b.getQueue().getNameShortString()); } queues.add(b.getQueue()); } } - - return new ArrayList<BaseQueue>(queues); - } - - - public boolean isBound(String bindingKey, Map<String, Object> arguments, AMQQueue queue) - { - CopyOnWriteArraySet<Binding> bindings; - if(bindingKey == null) - { - bindings = new CopyOnWriteArraySet<Binding>(getBindings()); - } - else - { - bindings = _bindingsByKey.get(bindingKey); - } - - if(bindings != null) - { - for(Binding binding : bindings) - { - if(queue == null || binding.getQueue().equals(queue)) - { - return arguments == null ? binding.getArguments() == null : binding.getArguments().equals(arguments); - } - } - } - - return false; - } - - public boolean isBound(AMQShortString routingKey, FieldTable arguments, AMQQueue queue) - { - //fixme isBound here should take the arguements in to consideration. - return isBound(routingKey, queue); - } - - public boolean isBound(AMQShortString routingKey, AMQQueue queue) - { - String bindingKey = (routingKey == null) ? "" : routingKey.toString(); - CopyOnWriteArraySet<Binding> bindings = _bindingsByKey.get(bindingKey); - - if(bindings != null) - { - for(Binding binding : bindings) - { - if(binding.getQueue().equals(queue)) - { - return true; - } - } - } - - return false; - } - - public boolean isBound(AMQShortString routingKey) - { - String bindingKey = (routingKey == null) ? "" : routingKey.toString(); - CopyOnWriteArraySet<Binding> bindings = _bindingsByKey.get(bindingKey); - return bindings != null && !bindings.isEmpty(); - } - - public boolean isBound(AMQQueue queue) - { - for (CopyOnWriteArraySet<Binding> bindings : _bindingsByKey.values()) - { - for(Binding binding : bindings) - { - if(binding.getQueue().equals(queue)) - { - return true; - } - } - } - - return false; - } - public boolean hasBindings() - { - return !getBindings().isEmpty(); + return new ArrayList<BaseQueue>(queues); } protected void onBind(final Binding binding) @@ -216,7 +135,7 @@ public class HeadersExchange extends AbstractExchange bindings = newBindings; } } - + if(_logger.isDebugEnabled()) { _logger.debug("Exchange " + getNameShortString() + ": Binding " + queue.getNameShortString() + diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java index 6d548be508..9d41856dc0 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java @@ -20,21 +20,15 @@ */ package org.apache.qpid.server.exchange; -import java.lang.ref.WeakReference; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Map; -import java.util.WeakHashMap; import java.util.concurrent.ConcurrentHashMap; import org.apache.log4j.Logger; import org.apache.qpid.AMQInvalidArgumentException; -import org.apache.qpid.common.AMQPFilterTypes; -import org.apache.qpid.filter.SelectorParsingException; -import org.apache.qpid.filter.selector.ParseException; -import org.apache.qpid.filter.selector.TokenMgrError; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.binding.Binding; @@ -42,14 +36,10 @@ import org.apache.qpid.server.exchange.topic.TopicExchangeResult; import org.apache.qpid.server.exchange.topic.TopicMatcherResult; import org.apache.qpid.server.exchange.topic.TopicNormalizer; import org.apache.qpid.server.exchange.topic.TopicParser; -import org.apache.qpid.server.filter.JMSSelectorFilter; -import org.apache.qpid.server.filter.MessageFilter; import org.apache.qpid.server.message.InboundMessage; import org.apache.qpid.server.plugin.ExchangeType; -import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.BaseQueue; -import org.apache.qpid.server.queue.Filterable; public class TopicExchange extends AbstractExchange { @@ -65,8 +55,6 @@ public class TopicExchange extends AbstractExchange private final Map<Binding, FieldTable> _bindings = new HashMap<Binding, FieldTable>(); - private final Map<String, WeakReference<JMSSelectorFilter>> _selectorCache = new WeakHashMap<String, WeakReference<JMSSelectorFilter>>(); - public TopicExchange() { super(TYPE); @@ -77,7 +65,7 @@ public class TopicExchange extends AbstractExchange AMQShortString rKey = new AMQShortString(binding.getBindingKey()) ; AMQQueue queue = binding.getQueue(); FieldTable args = FieldTable.convertToFieldTable(binding.getArguments()); - + assert queue != null; assert rKey != null; @@ -91,26 +79,26 @@ public class TopicExchange extends AbstractExchange FieldTable oldArgs = _bindings.get(binding); TopicExchangeResult result = _topicExchangeResults.get(routingKey); - if(argumentsContainFilter(args)) + if(FilterSupport.argumentsContainFilter(args)) { - if(argumentsContainFilter(oldArgs)) + if(FilterSupport.argumentsContainFilter(oldArgs)) { result.replaceQueueFilter(queue, - createMessageFilter(oldArgs, queue), - createMessageFilter(args, queue)); + FilterSupport.createMessageFilter(oldArgs, queue), + FilterSupport.createMessageFilter(args, queue)); } else { - result.addFilteredQueue(queue, createMessageFilter(args, queue)); + result.addFilteredQueue(queue, FilterSupport.createMessageFilter(args, queue)); result.removeUnfilteredQueue(queue); } } else { - if(argumentsContainFilter(oldArgs)) + if(FilterSupport.argumentsContainFilter(oldArgs)) { result.addUnfilteredQueue(queue); - result.removeFilteredQueue(queue, createMessageFilter(oldArgs, queue)); + result.removeFilteredQueue(queue, FilterSupport.createMessageFilter(oldArgs, queue)); } else { @@ -118,7 +106,7 @@ public class TopicExchange extends AbstractExchange return; } } - + result.addBinding(binding); } @@ -129,9 +117,9 @@ public class TopicExchange extends AbstractExchange if(result == null) { result = new TopicExchangeResult(); - if(argumentsContainFilter(args)) + if(FilterSupport.argumentsContainFilter(args)) { - result.addFilteredQueue(queue, createMessageFilter(args, queue)); + result.addFilteredQueue(queue, FilterSupport.createMessageFilter(args, queue)); } else { @@ -142,89 +130,22 @@ public class TopicExchange extends AbstractExchange } else { - if(argumentsContainFilter(args)) + if(FilterSupport.argumentsContainFilter(args)) { - result.addFilteredQueue(queue, createMessageFilter(args, queue)); + result.addFilteredQueue(queue, FilterSupport.createMessageFilter(args, queue)); } else { result.addUnfilteredQueue(queue); } } - + result.addBinding(binding); _bindings.put(binding, args); } } - private MessageFilter createMessageFilter(final FieldTable args, AMQQueue queue) throws AMQInvalidArgumentException - { - if(argumentsContainNoLocal(args)) - { - MessageFilter filter = new NoLocalFilter(queue); - - if(argumentsContainJMSSelector(args)) - { - filter = new CompoundFilter(filter, createJMSSelectorFilter(args)); - } - return filter; - } - else - { - return createJMSSelectorFilter(args); - } - - } - - - private MessageFilter createJMSSelectorFilter(FieldTable args) throws AMQInvalidArgumentException - { - final String selectorString = args.getString(AMQPFilterTypes.JMS_SELECTOR.getValue()); - WeakReference<JMSSelectorFilter> selectorRef = _selectorCache.get(selectorString); - JMSSelectorFilter selector = null; - - if(selectorRef == null || (selector = selectorRef.get())==null) - { - try - { - selector = new JMSSelectorFilter(selectorString); - } - catch (ParseException e) - { - throw new AMQInvalidArgumentException("Cannot parse JMS selector \"" + selectorString + "\"", e); - } - catch (SelectorParsingException e) - { - throw new AMQInvalidArgumentException("Cannot parse JMS selector \"" + selectorString + "\"", e); - } - catch (TokenMgrError e) - { - throw new AMQInvalidArgumentException("Cannot parse JMS selector \"" + selectorString + "\"", e); - } - _selectorCache.put(selectorString, new WeakReference<JMSSelectorFilter>(selector)); - } - return selector; - } - - private static boolean argumentsContainFilter(final FieldTable args) - { - return argumentsContainNoLocal(args) || argumentsContainJMSSelector(args); - } - - private static boolean argumentsContainNoLocal(final FieldTable args) - { - return args != null - && args.containsKey(AMQPFilterTypes.NO_LOCAL.getValue()) - && Boolean.TRUE.equals(args.get(AMQPFilterTypes.NO_LOCAL.getValue())); - } - - private static boolean argumentsContainJMSSelector(final FieldTable args) - { - return args != null && (args.containsKey(AMQPFilterTypes.JMS_SELECTOR.getValue()) - && args.getString(AMQPFilterTypes.JMS_SELECTOR.getValue()).trim().length() != 0); - } - public ArrayList<BaseQueue> doRoute(InboundMessage payload) { @@ -256,87 +177,6 @@ public class TopicExchange extends AbstractExchange } - public boolean isBound(AMQShortString routingKey, FieldTable arguments, AMQQueue queue) - { - Binding binding = new Binding(null, routingKey.toString(), queue, this, FieldTable.convertToMap(arguments)); - - if (arguments == null) - { - return _bindings.containsKey(binding); - } - else - { - FieldTable o = _bindings.get(binding); - if (o != null) - { - return o.equals(arguments); - } - else - { - return false; - } - - } - } - - public boolean isBound(String bindingKey, Map<String, Object> arguments, AMQQueue queue) - { - Binding binding = new Binding(null, bindingKey, queue, this, arguments); - if (arguments == null) - { - return _bindings.containsKey(binding); - } - else - { - FieldTable o = _bindings.get(binding); - if (o != null) - { - return arguments.equals(FieldTable.convertToMap(o)); - } - else - { - return false; - } - } - - } - - public boolean isBound(AMQShortString routingKey, AMQQueue queue) - { - return isBound(routingKey, null, queue); - } - - public boolean isBound(AMQShortString routingKey) - { - for(Binding b : _bindings.keySet()) - { - if(b.getBindingKey().equals(routingKey.toString())) - { - return true; - } - } - - return false; - } - - public boolean isBound(AMQQueue queue) - { - for(Binding b : _bindings.keySet()) - { - if(b.getQueue().equals(queue)) - { - return true; - } - } - - return false; - } - - public boolean hasBindings() - { - return !_bindings.isEmpty(); - } - private boolean deregisterQueue(final Binding binding) { if(_bindings.containsKey(binding)) @@ -344,14 +184,15 @@ public class TopicExchange extends AbstractExchange FieldTable bindingArgs = _bindings.remove(binding); AMQShortString bindingKey = TopicNormalizer.normalize(new AMQShortString(binding.getBindingKey())); TopicExchangeResult result = _topicExchangeResults.get(bindingKey); - + result.removeBinding(binding); - - if(argumentsContainFilter(bindingArgs)) + + if(FilterSupport.argumentsContainFilter(bindingArgs)) { try { - result.removeFilteredQueue(binding.getQueue(), createMessageFilter(bindingArgs, binding.getQueue())); + result.removeFilteredQueue(binding.getQueue(), FilterSupport.createMessageFilter(bindingArgs, + binding.getQueue())); } catch (AMQInvalidArgumentException e) { @@ -418,96 +259,4 @@ public class TopicExchange extends AbstractExchange deregisterQueue(binding); } - private static final class NoLocalFilter implements MessageFilter - { - private final AMQQueue _queue; - - public NoLocalFilter(AMQQueue queue) - { - _queue = queue; - } - - public boolean matches(Filterable message) - { - InboundMessage inbound = (InboundMessage) message; - final AMQSessionModel exclusiveOwningSession = _queue.getExclusiveOwningSession(); - return exclusiveOwningSession == null || !exclusiveOwningSession.onSameConnection(inbound); - - } - - @Override - public boolean equals(Object o) - { - if (this == o) - { - return true; - } - - if (o == null || getClass() != o.getClass()) - { - return false; - } - - NoLocalFilter that = (NoLocalFilter) o; - - return _queue == null ? that._queue == null : _queue.equals(that._queue); - } - - @Override - public int hashCode() - { - return _queue != null ? _queue.hashCode() : 0; - } - } - - private static final class CompoundFilter implements MessageFilter - { - private MessageFilter _noLocalFilter; - private MessageFilter _jmsSelectorFilter; - - public CompoundFilter(MessageFilter filter, MessageFilter jmsSelectorFilter) - { - _noLocalFilter = filter; - _jmsSelectorFilter = jmsSelectorFilter; - } - - public boolean matches(Filterable message) - { - return _noLocalFilter.matches(message) && _jmsSelectorFilter.matches(message); - } - - @Override - public boolean equals(Object o) - { - if (this == o) - { - return true; - } - if (o == null || getClass() != o.getClass()) - { - return false; - } - - CompoundFilter that = (CompoundFilter) o; - - if (_jmsSelectorFilter != null ? !_jmsSelectorFilter.equals(that._jmsSelectorFilter) : that._jmsSelectorFilter != null) - { - return false; - } - if (_noLocalFilter != null ? !_noLocalFilter.equals(that._noLocalFilter) : that._noLocalFilter != null) - { - return false; - } - - return true; - } - - @Override - public int hashCode() - { - int result = _noLocalFilter != null ? _noLocalFilter.hashCode() : 0; - result = 31 * result + (_jmsSelectorFilter != null ? _jmsSelectorFilter.hashCode() : 0); - return result; - } - } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java index b4eb41684d..2e6a98d81b 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java @@ -159,9 +159,15 @@ public class ExchangeBoundHandler implements StateAwareMethodListener<ExchangeBo else { + String message = "Queue " + queueName + " not bound with routing key " + + body.getRoutingKey() + " to exchange " + exchangeName; + + if(message.length()>255) + { + message = message.substring(0,254); + } response = methodRegistry.createExchangeBoundOkBody(SPECIFIC_QUEUE_NOT_BOUND_WITH_RK, // replyCode - new AMQShortString("Queue " + queueName + " not bound with routing key " + - body.getRoutingKey() + " to exchange " + exchangeName)); // replyText + new AMQShortString(message)); // replyText } } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java index d8d245e255..110c7be50a 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java +++ b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java @@ -1130,22 +1130,22 @@ public class ServerSessionDelegate extends SessionDelegate if(queueMatched) { - result.setKeyNotMatched(!exchange.isBound(method.getBindingKey(), queue)); + final boolean keyMatched = exchange.isBound(method.getBindingKey(), queue); + result.setKeyNotMatched(!keyMatched); + if(method.hasArguments() && keyMatched) + { + result.setArgsNotMatched(!exchange.isBound(method.getBindingKey(), method.getArguments(), queue)); + } } else { result.setKeyNotMatched(!exchange.isBound(method.getBindingKey())); } - if(method.hasArguments()) - { - result.setArgsNotMatched(!exchange.isBound(result.getKeyNotMatched() ? null : method.getBindingKey(), method.getArguments(), queueMatched ? queue : null)); - } - } else if (method.hasArguments()) { - result.setArgsNotMatched(!exchange.isBound(null, method.getArguments(), queueMatched ? queue : null)); + result.setArgsNotMatched(!exchange.isBound(method.getArguments(), queue)); } } @@ -1166,7 +1166,7 @@ public class ServerSessionDelegate extends SessionDelegate { if(method.hasArguments()) { - result.setArgsNotMatched(!exchange.isBound(method.getBindingKey(), method.getArguments(), null)); + result.setArgsNotMatched(!exchange.isBound(method.getBindingKey(), method.getArguments())); } result.setKeyNotMatched(!exchange.isBound(method.getBindingKey())); diff --git a/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java b/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java deleted file mode 100644 index f4c0fec6c9..0000000000 --- a/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java +++ /dev/null @@ -1,633 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.exchange; - -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.atomic.AtomicLong; - -import org.apache.log4j.Logger; -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.BasicContentHeaderProperties; -import org.apache.qpid.framing.ContentBody; -import org.apache.qpid.framing.ContentHeaderBody; -import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.framing.FieldTableFactory; -import org.apache.qpid.framing.abstraction.MessagePublishInfo; -import org.apache.qpid.server.binding.Binding; -import org.apache.qpid.server.message.AMQMessage; -import org.apache.qpid.server.message.AMQMessageHeader; -import org.apache.qpid.server.message.MessageMetaData; -import org.apache.qpid.server.message.ServerMessage; -import org.apache.qpid.server.model.UUIDGenerator; -import org.apache.qpid.server.protocol.AMQProtocolSession; -import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.queue.BaseQueue; -import org.apache.qpid.server.queue.IncomingMessage; -import org.apache.qpid.server.queue.MockStoredMessage; -import org.apache.qpid.server.queue.QueueEntry; -import org.apache.qpid.server.queue.SimpleAMQQueue; -import org.apache.qpid.server.store.StoredMessage; -import org.apache.qpid.server.subscription.Subscription; -import org.apache.qpid.server.util.BrokerTestHelper; -import org.apache.qpid.server.virtualhost.VirtualHost; -import org.apache.qpid.test.utils.QpidTestCase; - -public class AbstractHeadersExchangeTestBase extends QpidTestCase -{ - private static final Logger _log = Logger.getLogger(AbstractHeadersExchangeTestBase.class); - - private final HeadersExchange exchange = new HeadersExchange(); - private final Set<TestQueue> queues = new HashSet<TestQueue>(); - private VirtualHost _virtualHost; - private int count; - - @Override - public void setUp() throws Exception - { - super.setUp(); - BrokerTestHelper.setUp(); - _virtualHost = BrokerTestHelper.createVirtualHost(getClass().getName()); - } - - @Override - public void tearDown() throws Exception - { - try - { - if (_virtualHost != null) - { - _virtualHost.close(); - } - } - finally - { - BrokerTestHelper.tearDown(); - super.tearDown(); - } - } - - public void testDoNothing() - { - // this is here only to make junit under Eclipse happy - } - - public VirtualHost getVirtualHost() - { - return _virtualHost; - } - - protected TestQueue bindDefault(String... bindings) throws AMQException - { - String queueName = "Queue" + (++count); - - return bind(queueName, queueName, getHeadersMap(bindings)); - } - - protected void unbind(TestQueue queue, String... bindings) throws AMQException - { - String queueName = queue.getName(); - exchange.onUnbind(new Binding(null, queueName, queue, exchange, getHeadersMap(bindings))); - } - - protected int getCount() - { - return count; - } - - private TestQueue bind(String key, String queueName, Map<String,Object> args) throws AMQException - { - TestQueue queue = new TestQueue(new AMQShortString(queueName), _virtualHost); - queues.add(queue); - exchange.onBind(new Binding(null, key, queue, exchange, args)); - return queue; - } - - - protected int route(Message m) throws AMQException - { - m.getIncomingMessage().headersReceived(System.currentTimeMillis()); - m.route(exchange); - if(m.getIncomingMessage().allContentReceived()) - { - for(BaseQueue q : m.getIncomingMessage().getDestinationQueues()) - { - q.enqueue(m); - } - } - return m.getIncomingMessage().getDestinationQueues().size(); - } - - protected void routeAndTest(Message m, TestQueue... expected) throws AMQException - { - routeAndTest(m, false, Arrays.asList(expected)); - } - - protected void routeAndTest(Message m, boolean expectReturn, TestQueue... expected) throws AMQException - { - routeAndTest(m, expectReturn, Arrays.asList(expected)); - } - - protected void routeAndTest(Message m, List<TestQueue> expected) throws AMQException - { - routeAndTest(m, false, expected); - } - - protected void routeAndTest(Message m, boolean expectReturn, List<TestQueue> expected) throws AMQException - { - int queueCount = route(m); - - for (TestQueue q : queues) - { - if (expected.contains(q)) - { - assertTrue("Expected " + m + " to be delivered to " + q, q.isInQueue(m)); - //assert m.isInQueue(q) : "Expected " + m + " to be delivered to " + q; - } - else - { - assertFalse("Did not expect " + m + " to be delivered to " + q, q.isInQueue(m)); - //assert !m.isInQueue(q) : "Did not expect " + m + " to be delivered to " + q; - } - } - - if(expectReturn) - { - assertEquals("Expected "+m+" to be returned due to manadatory flag, and lack of routing",0, queueCount); - } - - } - - static Map<String,Object> getHeadersMap(String... entries) - { - if(entries == null) - { - return null; - } - - Map<String,Object> headers = new HashMap<String,Object>(); - - for (String s : entries) - { - String[] parts = s.split("=", 2); - headers.put(parts[0], parts.length > 1 ? parts[1] : ""); - } - return headers; - } - - static FieldTable getHeaders(String... entries) - { - FieldTable headers = FieldTableFactory.newFieldTable(); - for (String s : entries) - { - String[] parts = s.split("=", 2); - headers.setObject(parts[0], parts.length > 1 ? parts[1] : ""); - } - return headers; - } - - - static final class MessagePublishInfoImpl implements MessagePublishInfo - { - private AMQShortString _exchange; - private boolean _immediate; - private boolean _mandatory; - private AMQShortString _routingKey; - - public MessagePublishInfoImpl(AMQShortString routingKey) - { - _routingKey = routingKey; - } - - public MessagePublishInfoImpl(AMQShortString exchange, boolean immediate, boolean mandatory, AMQShortString routingKey) - { - _exchange = exchange; - _immediate = immediate; - _mandatory = mandatory; - _routingKey = routingKey; - } - - public AMQShortString getExchange() - { - return _exchange; - } - - public boolean isImmediate() - { - return _immediate; - - } - - public boolean isMandatory() - { - return _mandatory; - } - - public AMQShortString getRoutingKey() - { - return _routingKey; - } - - - public void setExchange(AMQShortString exchange) - { - _exchange = exchange; - } - - public void setImmediate(boolean immediate) - { - _immediate = immediate; - } - - public void setMandatory(boolean mandatory) - { - _mandatory = mandatory; - } - - public void setRoutingKey(AMQShortString routingKey) - { - _routingKey = routingKey; - } - } - - static MessagePublishInfo getPublishRequest(final String id) - { - return new MessagePublishInfoImpl(null, false, false, new AMQShortString(id)); - } - - static ContentHeaderBody getContentHeader(FieldTable headers) - { - ContentHeaderBody header = new ContentHeaderBody(); - header.setProperties(getProperties(headers)); - return header; - } - - static BasicContentHeaderProperties getProperties(FieldTable headers) - { - BasicContentHeaderProperties properties = new BasicContentHeaderProperties(); - properties.setHeaders(headers); - return properties; - } - - static class TestQueue extends SimpleAMQQueue - { - private final List<HeadersExchangeTest.Message> messages = new ArrayList<HeadersExchangeTest.Message>(); - - public String toString() - { - return getNameShortString().toString(); - } - - public TestQueue(AMQShortString name, VirtualHost host) throws AMQException - { - super(UUIDGenerator.generateRandomUUID(), name, false, new AMQShortString("test"), true, false, host, Collections.EMPTY_MAP); - host.getQueueRegistry().registerQueue(this); - } - - - - /** - * We override this method so that the default behaviour, which attempts to use a delivery manager, is - * not invoked. It is unnecessary since for this test we only care to know whether the message was - * sent to the queue; the queue processing logic is not being tested. - * @param msg - * @throws AMQException - */ - @Override - public void enqueue(ServerMessage msg, boolean sync, PostEnqueueAction action) throws AMQException - { - messages.add( new HeadersExchangeTest.Message((AMQMessage) msg)); - final QueueEntry queueEntry = new QueueEntry() - { - - public AMQQueue getQueue() - { - return null; - } - - public AMQMessage getMessage() - { - return null; - } - - public long getSize() - { - return 0; - } - - public boolean getDeliveredToConsumer() - { - return false; - } - - public boolean expired() throws AMQException - { - return false; - } - - public boolean isAvailable() - { - return false; - } - - public boolean isAcquired() - { - return false; - } - - public boolean acquire() - { - return false; - } - - public boolean acquire(Subscription sub) - { - return false; - } - - public boolean delete() - { - return false; - } - - public boolean isDeleted() - { - return false; - } - - public boolean acquiredBySubscription() - { - return false; - } - - public boolean isAcquiredBy(Subscription subscription) - { - return false; - } - - public void release() - { - - } - - public void setRedelivered() - { - - } - - public AMQMessageHeader getMessageHeader() - { - return null; - } - - public boolean isPersistent() - { - return false; - } - - public boolean isRedelivered() - { - return false; - } - - public Subscription getDeliveredSubscription() - { - return null; - } - - public void reject() - { - - } - - public boolean isRejectedBy(long subscriptionId) - { - return false; - } - - public void dequeue() - { - - } - - public void dispose() - { - - } - - public void discard() - { - - } - - public void routeToAlternate() - { - - } - - public boolean isQueueDeleted() - { - return false; - } - - public void addStateChangeListener(StateChangeListener listener) - { - - } - - public boolean removeStateChangeListener(StateChangeListener listener) - { - return false; - } - - public int compareTo(final QueueEntry o) - { - return 0; - } - - public boolean isDequeued() - { - return false; - } - - public boolean isDispensed() - { - return false; - } - - public QueueEntry getNextNode() - { - return null; - } - - public QueueEntry getNextValidEntry() - { - return null; - } - - public int getDeliveryCount() - { - return 0; - } - - public void incrementDeliveryCount() - { - } - - public void decrementDeliveryCount() - { - } - }; - - if(action != null) - { - action.onEnqueue(queueEntry); - } - - } - - boolean isInQueue(Message msg) - { - return messages.contains(msg); - } - - } - - /** - * Just add some extra utility methods to AMQMessage to aid testing. - */ - static class Message extends AMQMessage - { - private static AtomicLong _messageId = new AtomicLong(); - - private class TestIncomingMessage extends IncomingMessage - { - - public TestIncomingMessage(final long messageId, - final MessagePublishInfo info, - final AMQProtocolSession publisher) - { - super(info); - } - - - public ContentHeaderBody getContentHeader() - { - return Message.this.getContentHeaderBody(); - } - } - - private IncomingMessage _incoming; - - - Message(AMQProtocolSession protocolSession, String id, String... headers) throws AMQException - { - this(protocolSession, id, getHeaders(headers)); - } - - Message(AMQProtocolSession protocolSession, String id, FieldTable headers) throws AMQException - { - this(protocolSession, _messageId.incrementAndGet(),getPublishRequest(id), getContentHeader(headers), Collections.EMPTY_LIST); - } - - public IncomingMessage getIncomingMessage() - { - return _incoming; - } - - private Message(AMQProtocolSession protocolsession, long messageId, - MessagePublishInfo publish, - ContentHeaderBody header, - List<ContentBody> bodies) throws AMQException - { - super(new MockStoredMessage(messageId, publish, header)); - - StoredMessage<MessageMetaData> storedMessage = getStoredMessage(); - - int pos = 0; - for(ContentBody body : bodies) - { - storedMessage.addContent(pos, ByteBuffer.wrap(body.getPayload())); - pos += body.getPayload().length; - } - - _incoming = new TestIncomingMessage(getMessageId(),publish, protocolsession); - _incoming.setContentHeaderBody(header); - - - } - - - private Message(AMQMessage msg) throws AMQException - { - super(msg.getStoredMessage()); - } - - - - void route(Exchange exchange) throws AMQException - { - _incoming.enqueue(exchange.route(_incoming)); - } - - - public int hashCode() - { - return getKey().hashCode(); - } - - public boolean equals(Object o) - { - return o instanceof HeadersExchangeTest.Message && equals((HeadersExchangeTest.Message) o); - } - - private boolean equals(HeadersExchangeTest.Message m) - { - return getKey().equals(m.getKey()); - } - - public String toString() - { - return getKey().toString(); - } - - private Object getKey() - { - try - { - return getMessagePublishInfo().getRoutingKey(); - } - catch (AMQException e) - { - _log.error("Error getting routing key: " + e, e); - return null; - } - } - } -} diff --git a/java/broker/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java b/java/broker/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java index 2ddb417d5d..7b7e2ec346 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java @@ -21,22 +21,32 @@ package org.apache.qpid.server.exchange; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anySet; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import java.util.Collections; +import java.util.List; +import java.util.Set; import java.util.UUID; import junit.framework.TestCase; import org.apache.qpid.AMQException; import org.apache.qpid.AMQInternalException; import org.apache.qpid.AMQSecurityException; +import org.apache.qpid.common.AMQPFilterTypes; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.logging.LogActor; import org.apache.qpid.server.logging.actors.CurrentActor; +import org.apache.qpid.server.message.AMQMessageHeader; +import org.apache.qpid.server.message.InboundMessage; import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.BaseQueue; import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.virtualhost.VirtualHost; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; public class FanoutExchangeTest extends TestCase { @@ -51,7 +61,9 @@ public class FanoutExchangeTest extends TestCase _virtualHost = mock(VirtualHost.class); SecurityManager securityManager = mock(SecurityManager.class); when(_virtualHost.getSecurityManager()).thenReturn(securityManager); - when(securityManager.authoriseBind(any(Exchange.class),any(AMQQueue.class),any(AMQShortString.class))).thenReturn(true); + when(securityManager.authoriseBind(any(Exchange.class), any(AMQQueue.class), any(AMQShortString.class))).thenReturn(true); + when(securityManager.authoriseUnbind(any(Exchange.class), any(AMQShortString.class), any(AMQQueue.class))).thenReturn(true); + _exchange.initialise(UUID.randomUUID(), _virtualHost, AMQShortString.valueOf("test"), false, 0, false); } @@ -76,14 +88,14 @@ public class FanoutExchangeTest extends TestCase { AMQQueue queue = bindQueue(); assertTrue("Should return true for a bound queue", - _exchange.isBound((AMQShortString) null, (FieldTable) null, queue)); + _exchange.isBound(new AMQShortString("matters"), (FieldTable) null, queue)); } public void testIsBoundAMQShortStringAMQQueue() throws AMQSecurityException, AMQInternalException { AMQQueue queue = bindQueue(); assertTrue("Should return true for a bound queue", - _exchange.isBound((AMQShortString) null, queue)); + _exchange.isBound(new AMQShortString("matters"), queue)); } public void testIsBoundAMQQueue() throws AMQSecurityException, AMQInternalException @@ -95,9 +107,86 @@ public class FanoutExchangeTest extends TestCase private AMQQueue bindQueue() throws AMQSecurityException, AMQInternalException { + AMQQueue queue = mockQueue(); + _exchange.addBinding("matters", queue, null); + return queue; + } + + private AMQQueue mockQueue() + { AMQQueue queue = mock(AMQQueue.class); when(queue.getVirtualHost()).thenReturn(_virtualHost); - _exchange.addBinding("does not matter", queue, null); return queue; } + + public void testRoutingWithSelectors() throws Exception + { + AMQQueue queue1 = mockQueue(); + AMQQueue queue2 = mockQueue(); + + _exchange.addBinding("key",queue1, null); + _exchange.addBinding("key",queue2, null); + + + List<? extends BaseQueue> result = _exchange.route(mockMessage(true)); + + assertEquals("Expected message to be routed to both queues", 2, result.size()); + assertTrue("Expected queue1 to be routed to", result.contains(queue1)); + assertTrue("Expected queue2 to be routed to", result.contains(queue2)); + + _exchange.addBinding("key2",queue2, Collections.singletonMap(AMQPFilterTypes.JMS_SELECTOR.toString(),(Object)"select = True")); + + + result = _exchange.route(mockMessage(true)); + + assertEquals("Expected message to be routed to both queues", 2, result.size()); + assertTrue("Expected queue1 to be routed to", result.contains(queue1)); + assertTrue("Expected queue2 to be routed to", result.contains(queue2)); + + _exchange.removeBinding("key",queue2,null); + + result = _exchange.route(mockMessage(true)); + + assertEquals("Expected message to be routed to both queues", 2, result.size()); + assertTrue("Expected queue1 to be routed to", result.contains(queue1)); + assertTrue("Expected queue2 to be routed to", result.contains(queue2)); + + + result = _exchange.route(mockMessage(false)); + + assertEquals("Expected message to be routed to queue1 only", 1, result.size()); + assertTrue("Expected queue1 to be routed to", result.contains(queue1)); + assertFalse("Expected queue2 not to be routed to", result.contains(queue2)); + + _exchange.addBinding("key",queue2, Collections.singletonMap(AMQPFilterTypes.JMS_SELECTOR.toString(),(Object)"select = False")); + + + result = _exchange.route(mockMessage(false)); + assertEquals("Expected message to be routed to both queues", 2, result.size()); + assertTrue("Expected queue1 to be routed to", result.contains(queue1)); + assertTrue("Expected queue2 to be routed to", result.contains(queue2)); + + + } + + private InboundMessage mockMessage(boolean val) + { + final AMQMessageHeader header = mock(AMQMessageHeader.class); + when(header.containsHeader("select")).thenReturn(true); + when(header.getHeader("select")).thenReturn(val); + when(header.getHeaderNames()).thenReturn(Collections.singleton("select")); + when(header.containsHeaders(anySet())).then(new Answer<Object>() + { + @Override + public Object answer(InvocationOnMock invocation) throws Throwable + { + final Set names = (Set) invocation.getArguments()[0]; + return names.size() == 1 && names.contains("select"); + + } + }); + final InboundMessage inboundMessage = mock(InboundMessage.class); + when(inboundMessage.getMessageHeader()).thenReturn(header); + return inboundMessage; + } } diff --git a/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java b/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java index bd6a02d69b..2b965358e0 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java @@ -20,106 +20,230 @@ */ package org.apache.qpid.server.exchange; -import org.apache.qpid.AMQException; -import org.apache.qpid.server.protocol.AMQProtocolSession; -import org.apache.qpid.server.protocol.InternalTestProtocolSession; -import org.apache.qpid.server.util.BrokerTestHelper; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import junit.framework.TestCase; +import org.apache.qpid.AMQInternalException; +import org.apache.qpid.AMQSecurityException; +import org.apache.qpid.common.AMQPFilterTypes; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.server.logging.LogActor; +import org.apache.qpid.server.logging.actors.CurrentActor; +import org.apache.qpid.server.message.AMQMessageHeader; +import org.apache.qpid.server.message.InboundMessage; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.BaseQueue; +import org.apache.qpid.server.security.SecurityManager; +import org.apache.qpid.server.virtualhost.VirtualHost; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; -public class HeadersExchangeTest extends AbstractHeadersExchangeTestBase +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anySet; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class HeadersExchangeTest extends TestCase { - private AMQProtocolSession _protocolSession; + private HeadersExchange _exchange; + private VirtualHost _virtualHost; @Override public void setUp() throws Exception { super.setUp(); - BrokerTestHelper.setUp(); - _protocolSession = new InternalTestProtocolSession(getVirtualHost(), BrokerTestHelper.createBrokerMock()); + + CurrentActor.setDefault(mock(LogActor.class)); + _exchange = new HeadersExchange(); + _virtualHost = mock(VirtualHost.class); + SecurityManager securityManager = mock(SecurityManager.class); + when(_virtualHost.getSecurityManager()).thenReturn(securityManager); + when(securityManager.authoriseBind(any(Exchange.class), any(AMQQueue.class), any(AMQShortString.class))).thenReturn(true); + when(securityManager.authoriseUnbind(any(Exchange.class), any(AMQShortString.class), any(AMQQueue.class))).thenReturn(true); + + _exchange.initialise(UUID.randomUUID(), _virtualHost, AMQShortString.valueOf("test"), false, 0, false); + } - @Override - public void tearDown() throws Exception + protected void routeAndTest(InboundMessage msg, AMQQueue... expected) throws Exception { - BrokerTestHelper.tearDown(); - super.tearDown(); + List<? extends BaseQueue> results = _exchange.route(msg); + List<? extends BaseQueue> unexpected = new ArrayList<BaseQueue>(results); + unexpected.removeAll(Arrays.asList(expected)); + assertTrue("Message delivered to unexpected queues: " + unexpected, unexpected.isEmpty()); + List<? extends BaseQueue> missing = new ArrayList<BaseQueue>(Arrays.asList(expected)); + missing.removeAll(results); + assertTrue("Message not delivered to expected queues: " + missing, missing.isEmpty()); + assertTrue("Duplicates " + results, results.size()==(new HashSet<BaseQueue>(results)).size()); } - public void testSimple() throws AMQException + + private AMQQueue createAndBind(final String name, String... arguments) + throws Exception { - TestQueue q1 = bindDefault("F0000"); - TestQueue q2 = bindDefault("F0000=Aardvark"); - TestQueue q3 = bindDefault("F0001"); - TestQueue q4 = bindDefault("F0001=Bear"); - TestQueue q5 = bindDefault("F0000", "F0001"); - TestQueue q6 = bindDefault("F0000=Aardvark", "F0001=Bear"); - TestQueue q7 = bindDefault("F0000", "F0001=Bear"); - TestQueue q8 = bindDefault("F0000=Aardvark", "F0001"); - - routeAndTest(new Message(_protocolSession, "Message1", "F0000"), q1); - routeAndTest(new Message(_protocolSession, "Message2", "F0000=Aardvark"), q1, q2); - routeAndTest(new Message(_protocolSession, "Message3", "F0000=Aardvark", "F0001"), q1, q2, q3, q5, q8); - routeAndTest(new Message(_protocolSession, "Message4", "F0000", "F0001=Bear"), q1, q3, q4, q5, q7); - routeAndTest(new Message(_protocolSession, "Message5", "F0000=Aardvark", "F0001=Bear"), - q1, q2, q3, q4, q5, q6, q7, q8); - routeAndTest(new Message(_protocolSession, "Message6", "F0002")); - - Message m7 = new Message(_protocolSession, "Message7", "XXXXX"); - - MessagePublishInfoImpl pb7 = (MessagePublishInfoImpl) (m7.getMessagePublishInfo()); - pb7.setMandatory(true); - routeAndTest(m7,true); - - Message m8 = new Message(_protocolSession, "Message8", "F0000"); - MessagePublishInfoImpl pb8 = (MessagePublishInfoImpl)(m8.getMessagePublishInfo()); - pb8.setMandatory(true); - routeAndTest(m8,false,q1); + return createAndBind(name, getArgsMapFromStrings(arguments)); + } + + private Map<String, Object> getArgsMapFromStrings(String... arguments) + { + Map<String, Object> map = new HashMap<String,Object>(); + + for(String arg : arguments) + { + if(arg.contains("=")) + { + String[] keyValue = arg.split("=",2); + map.put(keyValue[0],keyValue[1]); + } + else + { + map.put(arg,null); + } + } + return map; + } + private AMQQueue createAndBind(final String name, Map<String, Object> arguments) + throws Exception + { + AMQQueue q = create(name); + bind(name, arguments, q); + return q; + } + private void bind(String bindingKey, Map<String, Object> arguments, AMQQueue q) + throws AMQSecurityException, AMQInternalException + { + _exchange.addBinding(bindingKey,q,arguments); } - public void testAny() throws AMQException + private AMQQueue create(String name) { - TestQueue q1 = bindDefault("F0000", "F0001", "X-match=any"); - TestQueue q2 = bindDefault("F0000=Aardvark", "F0001=Bear", "X-match=any"); - TestQueue q3 = bindDefault("F0000", "F0001=Bear", "X-match=any"); - TestQueue q4 = bindDefault("F0000=Aardvark", "F0001", "X-match=any"); - TestQueue q6 = bindDefault("F0000=Apple", "F0001", "X-match=any"); - - routeAndTest(new Message(_protocolSession, "Message1", "F0000"), q1, q3); - routeAndTest(new Message(_protocolSession, "Message2", "F0000=Aardvark"), q1, q2, q3, q4); - routeAndTest(new Message(_protocolSession, "Message3", "F0000=Aardvark", "F0001"), q1, q2, q3, q4, q6); - routeAndTest(new Message(_protocolSession, "Message4", "F0000", "F0001=Bear"), q1, q2, q3, q4, q6); - routeAndTest(new Message(_protocolSession, "Message5", "F0000=Aardvark", "F0001=Bear"), q1, q2, q3, q4, q6); - routeAndTest(new Message(_protocolSession, "Message6", "F0002")); + AMQQueue q = mock(AMQQueue.class); + when(q.toString()).thenReturn(name); + when(q.getVirtualHost()).thenReturn(_virtualHost); + return q; } - public void testMandatory() throws AMQException + + public void testSimple() throws Exception { - bindDefault("F0000"); - Message m1 = new Message(_protocolSession, "Message1", "XXXXX"); - Message m2 = new Message(_protocolSession, "Message2", "F0000"); - MessagePublishInfoImpl pb1 = (MessagePublishInfoImpl) (m1.getMessagePublishInfo()); - pb1.setMandatory(true); - MessagePublishInfoImpl pb2 = (MessagePublishInfoImpl) (m2.getMessagePublishInfo()); - pb2.setMandatory(true); - routeAndTest(m1,true); + AMQQueue q1 = createAndBind("Q1", "F0000"); + AMQQueue q2 = createAndBind("Q2", "F0000=Aardvark"); + AMQQueue q3 = createAndBind("Q3", "F0001"); + AMQQueue q4 = createAndBind("Q4", "F0001=Bear"); + AMQQueue q5 = createAndBind("Q5", "F0000", "F0001"); + AMQQueue q6 = createAndBind("Q6", "F0000=Aardvark", "F0001=Bear"); + AMQQueue q7 = createAndBind("Q7", "F0000", "F0001=Bear"); + AMQQueue q8 = createAndBind("Q8", "F0000=Aardvark", "F0001"); + + routeAndTest(mockMessage(getArgsMapFromStrings("F0000")), q1); + routeAndTest(mockMessage(getArgsMapFromStrings("F0000=Aardvark")), q1, q2); + routeAndTest(mockMessage(getArgsMapFromStrings("F0000=Aardvark", "F0001")), q1, q2, q3, q5, q8); + routeAndTest(mockMessage(getArgsMapFromStrings("F0000", "F0001=Bear")), q1, q3, q4, q5, q7); + routeAndTest(mockMessage(getArgsMapFromStrings("F0000=Aardvark", "F0001=Bear")), + q1, q2, q3, q4, q5, q6, q7, q8); + routeAndTest(mockMessage(getArgsMapFromStrings("F0002"))); + } - - public void testOnUnbind() throws AMQException + + public void testAny() throws Exception { - TestQueue q1 = bindDefault("F0000"); - TestQueue q2 = bindDefault("F0000=Aardvark"); - TestQueue q3 = bindDefault("F0001"); - - routeAndTest(new Message(_protocolSession, "Message1", "F0000"), q1); - routeAndTest(new Message(_protocolSession, "Message2", "F0000=Aardvark"), q1, q2); - routeAndTest(new Message(_protocolSession, "Message3", "F0001"), q3); - - unbind(q1,"F0000"); - routeAndTest(new Message(_protocolSession, "Message4", "F0000")); - routeAndTest(new Message(_protocolSession, "Message5", "F0000=Aardvark"), q2); + AMQQueue q1 = createAndBind("Q1", "F0000", "F0001", "X-match=any"); + AMQQueue q2 = createAndBind("Q2", "F0000=Aardvark", "F0001=Bear", "X-match=any"); + AMQQueue q3 = createAndBind("Q3", "F0000", "F0001=Bear", "X-match=any"); + AMQQueue q4 = createAndBind("Q4", "F0000=Aardvark", "F0001", "X-match=any"); + AMQQueue q5 = createAndBind("Q5", "F0000=Apple", "F0001", "X-match=any"); + + routeAndTest(mockMessage(getArgsMapFromStrings("F0000")), q1, q3); + routeAndTest(mockMessage(getArgsMapFromStrings("F0000=Aardvark")), q1, q2, q3, q4); + routeAndTest(mockMessage(getArgsMapFromStrings("F0000=Aardvark", "F0001")), q1, q2, q3, q4, q5); + routeAndTest(mockMessage(getArgsMapFromStrings("F0000", "F0001=Bear")), q1, q2, q3, q4, q5); + routeAndTest(mockMessage(getArgsMapFromStrings("F0000=Aardvark", "F0001=Bear")), q1, q2, q3, q4, q5); + routeAndTest(mockMessage(getArgsMapFromStrings("F0002"))); + } + + public void testOnUnbind() throws Exception + { + AMQQueue q1 = createAndBind("Q1", "F0000"); + AMQQueue q2 = createAndBind("Q2", "F0000=Aardvark"); + AMQQueue q3 = createAndBind("Q3", "F0001"); + + routeAndTest(mockMessage(getArgsMapFromStrings("F0000")), q1); + routeAndTest(mockMessage(getArgsMapFromStrings("F0000=Aardvark")), q1, q2); + routeAndTest(mockMessage(getArgsMapFromStrings("F0001")), q3); + + _exchange.removeBinding("Q1",q1,getArgsMapFromStrings("F0000")); + + routeAndTest(mockMessage(getArgsMapFromStrings("F0000"))); + routeAndTest(mockMessage(getArgsMapFromStrings("F0000=Aardvark")), q2); + } + + + public void testWithSelectors() throws Exception + { + AMQQueue q1 = create("Q1"); + AMQQueue q2 = create("Q2"); + bind("q1",getArgsMapFromStrings("F"), q1); + bind("q1select",getArgsMapFromStrings("F", AMQPFilterTypes.JMS_SELECTOR.toString()+"=F='1'"), q1); + bind("q2",getArgsMapFromStrings("F=1"), q2); + + routeAndTest(mockMessage(getArgsMapFromStrings("F")),q1); + + routeAndTest(mockMessage(getArgsMapFromStrings("F=1")),q1,q2); + + + AMQQueue q3 = create("Q3"); + bind("q3select",getArgsMapFromStrings("F", AMQPFilterTypes.JMS_SELECTOR.toString()+"=F='1'"), q3); + routeAndTest(mockMessage(getArgsMapFromStrings("F=1")),q1,q2,q3); + routeAndTest(mockMessage(getArgsMapFromStrings("F=2")),q1); + bind("q3select2",getArgsMapFromStrings("F", AMQPFilterTypes.JMS_SELECTOR.toString()+"=F='2'"), q3); + + routeAndTest(mockMessage(getArgsMapFromStrings("F=2")),q1,q3); + + } + + private InboundMessage mockMessage(final Map<String, Object> headerValues) + { + final AMQMessageHeader header = mock(AMQMessageHeader.class); + when(header.containsHeader(anyString())).then(new Answer<Boolean>() + { + @Override + public Boolean answer(InvocationOnMock invocation) throws Throwable + { + return headerValues.containsKey((String) invocation.getArguments()[0]); + } + }); + when(header.getHeader(anyString())).then(new Answer<Object>() + { + @Override + public Object answer(InvocationOnMock invocation) throws Throwable + { + return headerValues.get((String) invocation.getArguments()[0]); + } + }); + when(header.getHeaderNames()).thenReturn(headerValues.keySet()); + when(header.containsHeaders(anySet())).then(new Answer<Boolean>() + { + @Override + public Boolean answer(InvocationOnMock invocation) throws Throwable + { + final Set names = (Set) invocation.getArguments()[0]; + return headerValues.keySet().containsAll(names); + + } + }); + final InboundMessage inboundMessage = mock(InboundMessage.class); + when(inboundMessage.getMessageHeader()).thenReturn(header); + return inboundMessage; } - public static junit.framework.Test suite() { |
