diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2014-02-24 01:04:25 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2014-02-24 01:04:25 +0000 |
| commit | d440e9884326185866044492f96c90cd993a1cfe (patch) | |
| tree | 1fc44d2d82c65741129d040a86a87127507db553 /qpid/java | |
| parent | 9e0e8ef7be2cd693d64d8c3d718a4cfab6bda789 (diff) | |
| download | qpid-python-d440e9884326185866044492f96c90cd993a1cfe.tar.gz | |
QPID-5582 : [Java Broker] only allow one binding per binding-key and queue at an exchange
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1571124 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
18 files changed, 197 insertions, 123 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/Binding.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/Binding.java index 1e3bafa39e..bde4cbac2b 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/Binding.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/Binding.java @@ -21,11 +21,18 @@ package org.apache.qpid.server.binding; import org.apache.qpid.server.exchange.Exchange; +import org.apache.qpid.server.logging.actors.CurrentActor; +import org.apache.qpid.server.logging.messages.BindingMessages; +import org.apache.qpid.server.logging.subjects.BindingLogSubject; +import org.apache.qpid.server.model.State; import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.util.StateChangeListener; import java.util.Collections; import java.util.Map; import java.util.UUID; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; public class Binding @@ -36,6 +43,13 @@ public class Binding private final Map<String, Object> _arguments; private final UUID _id; private final AtomicLong _matches = new AtomicLong(); + private final BindingLogSubject _logSubject; + //TODO : persist creation time + private long _createTime = System.currentTimeMillis(); + final AtomicBoolean _deleted = new AtomicBoolean(); + final CopyOnWriteArrayList<StateChangeListener<Binding,State>> _stateChangeListeners = + new CopyOnWriteArrayList<StateChangeListener<Binding, State>>(); + public Binding(UUID id, final String bindingKey, @@ -51,6 +65,11 @@ public class Binding //Perform ACLs queue.getVirtualHost().getSecurityManager().authoriseCreateBinding(this); + _logSubject = new BindingLogSubject(bindingKey,exchange,queue); + CurrentActor.get().message(_logSubject, BindingMessages.CREATED(String.valueOf(getArguments()), + getArguments() != null + && !getArguments().isEmpty())); + } @@ -89,11 +108,16 @@ public class Binding return _matches.get(); } - boolean isDurable() + public boolean isDurable() { return _queue.isDurable() && _exchange.isDurable(); } + public long getCreateTime() + { + return _createTime; + } + @Override public boolean equals(final Object o) { @@ -128,4 +152,30 @@ public class Binding return "Binding{bindingKey="+_bindingKey+", exchange="+_exchange+", queue="+_queue+", id= " + _id + " }"; } + public void delete() + { + if(_deleted.compareAndSet(false,true)) + { + for(StateChangeListener<Binding,State> listener : _stateChangeListeners) + { + listener.stateChanged(this, State.ACTIVE, State.DELETED); + } + CurrentActor.get().message(_logSubject, BindingMessages.DELETED()); + } + } + + public State getState() + { + return _deleted.get() ? State.DELETED : State.ACTIVE; + } + + public void addStateChangeListener(StateChangeListener<Binding,State> listener) + { + _stateChangeListeners.add(listener); + } + + public void removeStateChangeListener(StateChangeListener<Binding,State> listener) + { + _stateChangeListeners.remove(listener); + } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java index 0131ad0458..9ffc305300 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java @@ -26,15 +26,15 @@ import org.apache.qpid.server.binding.Binding; import org.apache.qpid.server.consumer.Consumer; import org.apache.qpid.server.logging.LogSubject; import org.apache.qpid.server.logging.actors.CurrentActor; -import org.apache.qpid.server.logging.messages.BindingMessages; import org.apache.qpid.server.logging.messages.ExchangeMessages; -import org.apache.qpid.server.logging.subjects.BindingLogSubject; import org.apache.qpid.server.logging.subjects.ExchangeLogSubject; import org.apache.qpid.server.message.InstanceProperties; +import org.apache.qpid.server.message.MessageDestination; import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.model.LifetimePolicy; +import org.apache.qpid.server.model.State; import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.plugin.ExchangeType; import org.apache.qpid.server.queue.AMQQueue; @@ -44,6 +44,7 @@ import org.apache.qpid.server.store.StorableMessageMetaData; import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.util.Action; import org.apache.qpid.server.util.MapValueConverter; +import org.apache.qpid.server.util.StateChangeListener; import org.apache.qpid.server.virtualhost.UnknownExchangeException; import org.apache.qpid.server.virtualhost.VirtualHost; @@ -82,7 +83,7 @@ public abstract class AbstractExchange<T extends Exchange> implements Exchange<T private Map<ExchangeReferrer,Object> _referrers = new ConcurrentHashMap<ExchangeReferrer,Object>(); private final CopyOnWriteArrayList<Binding> _bindings = new CopyOnWriteArrayList<Binding>(); - private UUID _id; + private final UUID _id; private final AtomicInteger _bindingCountHigh = new AtomicInteger(); private final AtomicLong _receivedMessageCount = new AtomicLong(); private final AtomicLong _receivedMessageSize = new AtomicLong(); @@ -93,8 +94,12 @@ public abstract class AbstractExchange<T extends Exchange> implements Exchange<T private final CopyOnWriteArrayList<Exchange.BindingListener> _listeners = new CopyOnWriteArrayList<Exchange.BindingListener>(); + private final ConcurrentHashMap<BindingIdentifier, Binding> _bindingsMap = new ConcurrentHashMap<BindingIdentifier, Binding>(); + + //TODO : persist creation time private long _createTime = System.currentTimeMillis(); + private StateChangeListener<Binding, State> _bindingListener; public AbstractExchange(VirtualHost vhost, Map<String, Object> attributes) throws UnknownExchangeException { @@ -142,7 +147,17 @@ public abstract class AbstractExchange<T extends Exchange> implements Exchange<T } } - + _bindingListener = new StateChangeListener<Binding, State>() + { + @Override + public void stateChanged(final Binding binding, final State oldState, final State newState) + { + if(newState == State.DELETED) + { + removeBinding(binding); + } + } + }; // Log Exchange creation CurrentActor.get().message(ExchangeMessages.CREATED(getExchangeType().getType(), _name, _durable)); } @@ -173,7 +188,8 @@ public abstract class AbstractExchange<T extends Exchange> implements Exchange<T List<Binding> bindings = new ArrayList<Binding>(_bindings); for(Binding binding : bindings) { - removeBinding(binding); + binding.removeStateChangeListener(_bindingListener); + binding.delete(); } if(_alternateExchange != null) @@ -571,30 +587,22 @@ public abstract class AbstractExchange<T extends Exchange> implements Exchange<T makeBinding(id, bindingKey,queue, argumentMap,true, false); } - @Override - public void removeBinding(final Binding b) + private void removeBinding(final Binding binding) { - removeBinding(b.getBindingKey(), b.getQueue(), b.getArguments()); - } + String bindingKey = binding.getBindingKey(); + AMQQueue queue = binding.getQueue(); - @Override - public Binding removeBinding(String bindingKey, AMQQueue queue, Map<String, Object> arguments) - { assert queue != null; if (bindingKey == null) { bindingKey = ""; } - if (arguments == null) - { - arguments = Collections.emptyMap(); - } // Check access _virtualHost.getSecurityManager().authoriseUnbind(this, bindingKey, queue); - BindingImpl b = _bindingsMap.remove(new BindingImpl(null, bindingKey,queue,arguments)); + Binding b = _bindingsMap.remove(new BindingIdentifier(bindingKey,queue)); if (b != null) { @@ -605,15 +613,14 @@ public abstract class AbstractExchange<T extends Exchange> implements Exchange<T { DurableConfigurationStoreHelper.removeBinding(_virtualHost.getDurableConfigurationStore(), b); } - b.logDestruction(); + b.delete(); } - return b; } @Override - public Binding getBinding(String bindingKey, AMQQueue queue, Map<String, Object> arguments) + public Binding getBinding(String bindingKey, AMQQueue queue) { assert queue != null; @@ -622,17 +629,9 @@ public abstract class AbstractExchange<T extends Exchange> implements Exchange<T bindingKey = ""; } - if(arguments == null) - { - arguments = Collections.emptyMap(); - } - - BindingImpl b = new BindingImpl(null, bindingKey,queue,arguments); - return _bindingsMap.get(b); + return _bindingsMap.get(new BindingIdentifier(bindingKey,queue)); } - private final ConcurrentHashMap<BindingImpl, BindingImpl> _bindingsMap = new ConcurrentHashMap<BindingImpl, BindingImpl>(); - private boolean makeBinding(UUID id, String bindingKey, AMQQueue queue, @@ -658,13 +657,14 @@ public abstract class AbstractExchange<T extends Exchange> implements Exchange<T bindingKey, _virtualHost.getName()); } - BindingImpl b = new BindingImpl(id, bindingKey, queue, arguments); - BindingImpl existingMapping = _bindingsMap.putIfAbsent(b, b); + Binding b = new Binding(id, bindingKey, queue, this, arguments); + Binding existingMapping = _bindingsMap.putIfAbsent(new BindingIdentifier(bindingKey,queue), b); if (existingMapping == null || force) { + b.addStateChangeListener(_bindingListener); if (existingMapping != null) { - removeBinding(existingMapping); + existingMapping.delete(); } if (b.isDurable() && !restore) @@ -674,7 +674,6 @@ public abstract class AbstractExchange<T extends Exchange> implements Exchange<T queue.addBinding(b); doAddBinding(b); - b.logCreation(); return true; } @@ -684,56 +683,61 @@ public abstract class AbstractExchange<T extends Exchange> implements Exchange<T } } - private final class BindingImpl extends Binding + + private static final class BindingIdentifier { - private final BindingLogSubject _logSubject; - //TODO : persist creation time - private long _createTime = System.currentTimeMillis(); + private final String _bindingKey; + private final AMQQueue _destination; - private BindingImpl(UUID id, - String bindingKey, - final AMQQueue queue, - final Map<String, Object> arguments) + private BindingIdentifier(final String bindingKey, final AMQQueue destination) { - super(id, bindingKey, queue, AbstractExchange.this, arguments); - _logSubject = new BindingLogSubject(bindingKey,AbstractExchange.this,queue); - + _bindingKey = bindingKey; + _destination = destination; } - public void onClose(final Exchange exchange) + public String getBindingKey() { - removeBinding(this); + return _bindingKey; } - void logCreation() + public AMQQueue getDestination() { - CurrentActor.get().message(_logSubject, BindingMessages.CREATED(String.valueOf(getArguments()), - getArguments() != null - && !getArguments().isEmpty())); + return _destination; } - void logDestruction() + @Override + public boolean equals(final Object o) { - CurrentActor.get().message(_logSubject, BindingMessages.DELETED()); - } + if (this == o) + { + return true; + } + if (o == null || getClass() != o.getClass()) + { + return false; + } - public String getOrigin() - { - return (String) getArguments().get("qpid.fed.origin"); - } + final BindingIdentifier that = (BindingIdentifier) o; - public long getCreateTime() - { - return _createTime; + if (!_bindingKey.equals(that._bindingKey)) + { + return false; + } + if (!_destination.equals(that._destination)) + { + return false; + } + + return true; } - public boolean isDurable() + @Override + public int hashCode() { - return getQueue().isDurable() && getExchange().isDurable(); + int result = _bindingKey.hashCode(); + result = 31 * result + _destination.hashCode(); + return result; } - } - - } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java index 764c169781..440f0f225f 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java @@ -35,6 +35,7 @@ import org.apache.qpid.server.message.InstanceProperties; import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.model.State; import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.plugin.ExchangeType; import org.apache.qpid.server.queue.AMQQueue; @@ -42,6 +43,7 @@ import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.store.StorableMessageMetaData; import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.util.Action; +import org.apache.qpid.server.util.StateChangeListener; import org.apache.qpid.server.virtualhost.VirtualHost; public class DefaultExchange implements Exchange<DirectExchange> @@ -123,21 +125,9 @@ public class DefaultExchange implements Exchange<DirectExchange> } @Override - public void removeBinding(Binding b) + public Binding getBinding(String bindingKey, AMQQueue queue) { - throw new AccessControlException("Cannot remove bindings to the default exchange"); - } - - @Override - public Binding removeBinding(String bindingKey, AMQQueue queue, Map<String, Object> arguments) - { - throw new AccessControlException("Cannot remove bindings to the default exchange"); - } - - @Override - public Binding getBinding(String bindingKey, AMQQueue queue, Map<String, Object> arguments) - { - if(_virtualHost.getQueue(bindingKey) == queue && (arguments == null || arguments.isEmpty())) + if(_virtualHost.getQueue(bindingKey) == queue) { return convertToBinding(queue); } @@ -157,7 +147,9 @@ public class DefaultExchange implements Exchange<DirectExchange> queueName, _virtualHost.getName()); - return new Binding(exchangeId, queueName, queue, this, Collections.EMPTY_MAP); + final Binding binding = new Binding(exchangeId, queueName, queue, this, Collections.EMPTY_MAP); + binding.addStateChangeListener(STATE_CHANGE_LISTENER); + return binding; } @Override @@ -346,4 +338,16 @@ public class DefaultExchange implements Exchange<DirectExchange> } } + private static final StateChangeListener<Binding, State> STATE_CHANGE_LISTENER = + new StateChangeListener<Binding, State>() + { + @Override + public void stateChanged(final Binding object, final State oldState, final State newState) + { + if(newState == State.DELETED) + { + throw new AccessControlException("Cannot remove bindings to the default exchange"); + } + } + }; } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/Exchange.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/Exchange.java index 7902892a46..613a827462 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/Exchange.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/Exchange.java @@ -72,11 +72,7 @@ public interface Exchange<T extends Exchange> extends ExchangeReferrer, MessageD void restoreBinding(UUID id, String bindingKey, AMQQueue queue, Map<String, Object> argumentMap); - void removeBinding(Binding b); - - Binding removeBinding(String bindingKey, AMQQueue queue, Map<String, Object> arguments); - - Binding getBinding(String bindingKey, AMQQueue queue, Map<String, Object> arguments); + Binding getBinding(String bindingKey, AMQQueue queue); void close(); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BindingAdapter.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BindingAdapter.java index e53db4a2da..5983596b17 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BindingAdapter.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BindingAdapter.java @@ -71,12 +71,12 @@ final class BindingAdapter extends AbstractConfiguredObject<BindingAdapter> impl public State getState() { - return null; //TODO + return _binding.getState(); } public boolean isDurable() { - return _binding.getQueue().isDurable() && _binding.getExchange().isDurable(); + return _binding.isDurable(); } public void setDurable(final boolean durable) @@ -131,7 +131,7 @@ final class BindingAdapter extends AbstractConfiguredObject<BindingAdapter> impl public void delete() { - _exchange.getExchange().removeBinding(_binding); + _binding.delete(); } @Override @@ -147,11 +147,11 @@ final class BindingAdapter extends AbstractConfiguredObject<BindingAdapter> impl } else if(STATE.equals(name)) { - + return getState(); } else if(DURABLE.equals(name)) { - return _queue.isDurable() && _exchange.isDurable(); + return isDurable(); } else if(LIFETIME_POLICY.equals(name)) { diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ExchangeAdapter.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ExchangeAdapter.java index 3c38ba89c8..704be692b4 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ExchangeAdapter.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ExchangeAdapter.java @@ -122,7 +122,7 @@ final class ExchangeAdapter extends AbstractConfiguredObject<ExchangeAdapter> im if(!_exchange.addBinding(bindingKey, amqQueue, bindingArguments)) { - Binding oldBinding = _exchange.getBinding(bindingKey, amqQueue, bindingArguments); + Binding oldBinding = _exchange.getBinding(bindingKey, amqQueue); Map<String, Object> oldArgs = oldBinding.getArguments(); if((oldArgs == null && !bindingArguments.isEmpty()) || (oldArgs != null && !oldArgs.equals(bindingArguments))) @@ -130,7 +130,7 @@ final class ExchangeAdapter extends AbstractConfiguredObject<ExchangeAdapter> im _exchange.replaceBinding(oldBinding.getId(), bindingKey, amqQueue, bindingArguments); } } - Binding binding = _exchange.getBinding(bindingKey, amqQueue, bindingArguments); + Binding binding = _exchange.getBinding(bindingKey, amqQueue); synchronized (_bindingAdapters) { diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java index ac118e76a3..1f62525794 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java @@ -1535,7 +1535,7 @@ abstract class AbstractQueue<E extends QueueEntryImpl<E,Q,L>, for (Binding b : bindingCopy) { - b.getExchange().removeBinding(b); + b.delete(); } QueueConsumerList.ConsumerNodeIterator consumerNodeIterator = _consumerList.iterator(); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/BindingRecoverer.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/BindingRecoverer.java index 948fa77048..02a6ed75fb 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/BindingRecoverer.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/BindingRecoverer.java @@ -106,14 +106,14 @@ public class BindingRecoverer extends AbstractDurableConfiguredObjectRecoverer<B @Override public Binding resolve() { - if(_exchange.getBinding(_bindingName, _queue, _bindingArgumentsMap) == null) + if(_exchange.getBinding(_bindingName, _queue) == null) { _logger.info("Restoring binding: (Exchange: " + _exchange.getName() + ", Queue: " + _queue.getName() + ", Routing Key: " + _bindingName + ", Arguments: " + _bindingArgumentsMap + ")"); _exchange.restoreBinding(_bindingId, _bindingName, _queue, _bindingArgumentsMap); } - return _exchange.getBinding(_bindingName, _queue, _bindingArgumentsMap); + return _exchange.getBinding(_bindingName, _queue); } private class QueueDependency implements UnresolvedDependency<AMQQueue> diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java index 014279ddf0..487b512180 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java @@ -38,7 +38,6 @@ import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.message.AMQMessageHeader; import org.apache.qpid.server.message.InstanceProperties; import org.apache.qpid.server.message.ServerMessage; -import org.apache.qpid.server.model.*; import org.apache.qpid.server.model.Exchange; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.BaseQueue; @@ -143,7 +142,7 @@ public class FanoutExchangeTest extends TestCase assertTrue("Expected queue1 to be routed to", result.contains(queue1)); assertTrue("Expected queue2 to be routed to", result.contains(queue2)); - _exchange.removeBinding("key",queue2,null); + _exchange.getBinding("key",queue2).delete(); result = _exchange.route(mockMessage(true),InstanceProperties.EMPTY); diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java index 1822992644..05569994d7 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java @@ -24,7 +24,10 @@ import java.util.Collection; import junit.framework.TestCase; import org.apache.qpid.server.binding.Binding; +import org.apache.qpid.server.logging.LogActor; +import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.message.AMQMessageHeader; +import org.apache.qpid.server.plugin.ExchangeType; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.security.*; import org.apache.qpid.server.virtualhost.VirtualHost; @@ -137,6 +140,7 @@ public class HeadersBindingTest extends TestCase private MockHeader matchHeaders = new MockHeader(); private int _count = 0; private AMQQueue _queue; + private Exchange _exchange; protected void setUp() { @@ -145,6 +149,9 @@ public class HeadersBindingTest extends TestCase VirtualHost vhost = mock(VirtualHost.class); when(_queue.getVirtualHost()).thenReturn(vhost); when(vhost.getSecurityManager()).thenReturn(mock(org.apache.qpid.server.security.SecurityManager.class)); + CurrentActor.set(mock(LogActor.class)); + _exchange = mock(Exchange.class); + when(_exchange.getExchangeType()).thenReturn(mock(ExchangeType.class)); } protected String getQueueName() @@ -158,7 +165,7 @@ public class HeadersBindingTest extends TestCase matchHeaders.setString("A", "Value of A"); - Binding b = new Binding(null, getQueueName(), _queue, null, bindHeaders); + Binding b = new Binding(null, getQueueName(), _queue, _exchange, bindHeaders); assertTrue(new HeadersBinding(b).matches(matchHeaders)); } @@ -169,7 +176,7 @@ public class HeadersBindingTest extends TestCase matchHeaders.setString("A", "Value of A"); matchHeaders.setString("B", "Value of B"); - Binding b = new Binding(null, getQueueName(), _queue, null, bindHeaders); + Binding b = new Binding(null, getQueueName(), _queue, _exchange, bindHeaders); assertTrue(new HeadersBinding(b).matches(matchHeaders)); } @@ -179,7 +186,7 @@ public class HeadersBindingTest extends TestCase matchHeaders.setString("A", "Altered value of A"); - Binding b = new Binding(null, getQueueName(), _queue, null, bindHeaders); + Binding b = new Binding(null, getQueueName(), _queue, _exchange, bindHeaders); assertFalse(new HeadersBinding(b).matches(matchHeaders)); } @@ -190,7 +197,7 @@ public class HeadersBindingTest extends TestCase matchHeaders.setString("A", "Value of A"); - Binding b = new Binding(null, getQueueName(), _queue, null, bindHeaders); + Binding b = new Binding(null, getQueueName(), _queue, _exchange, bindHeaders); assertTrue(new HeadersBinding(b).matches(matchHeaders)); } @@ -202,7 +209,7 @@ public class HeadersBindingTest extends TestCase matchHeaders.setString("A", "Value of A"); - Binding b = new Binding(null, getQueueName(), _queue, null, bindHeaders); + Binding b = new Binding(null, getQueueName(), _queue, _exchange, bindHeaders); assertFalse(new HeadersBinding(b).matches(matchHeaders)); } @@ -215,7 +222,7 @@ public class HeadersBindingTest extends TestCase matchHeaders.setString("A", "Value of A"); matchHeaders.setString("B", "Value of B"); - Binding b = new Binding(null, getQueueName(), _queue, null, bindHeaders); + Binding b = new Binding(null, getQueueName(), _queue, _exchange, bindHeaders); assertTrue(new HeadersBinding(b).matches(matchHeaders)); } @@ -229,7 +236,7 @@ public class HeadersBindingTest extends TestCase matchHeaders.setString("B", "Value of B"); matchHeaders.setString("C", "Value of C"); - Binding b = new Binding(null, getQueueName(), _queue, null, bindHeaders); + Binding b = new Binding(null, getQueueName(), _queue, _exchange, bindHeaders); assertTrue(new HeadersBinding(b).matches(matchHeaders)); } @@ -243,7 +250,7 @@ public class HeadersBindingTest extends TestCase matchHeaders.setString("B", "Altered value of B"); matchHeaders.setString("C", "Value of C"); - Binding b = new Binding(null, getQueueName(), _queue, null, bindHeaders); + Binding b = new Binding(null, getQueueName(), _queue, _exchange, bindHeaders); assertFalse(new HeadersBinding(b).matches(matchHeaders)); } @@ -254,7 +261,7 @@ public class HeadersBindingTest extends TestCase matchHeaders.setString("A", "Value of A"); - Binding b = new Binding(null, getQueueName(), _queue, null, bindHeaders); + Binding b = new Binding(null, getQueueName(), _queue, _exchange, bindHeaders); assertTrue(new HeadersBinding(b).matches(matchHeaders)); } @@ -266,7 +273,7 @@ public class HeadersBindingTest extends TestCase matchHeaders.setString("A", "Value of A"); - Binding b = new Binding(null, getQueueName(), _queue, null, bindHeaders); + Binding b = new Binding(null, getQueueName(), _queue, _exchange, bindHeaders); assertTrue(new HeadersBinding(b).matches(matchHeaders)); } @@ -279,7 +286,7 @@ public class HeadersBindingTest extends TestCase matchHeaders.setString("A", "Value of A"); matchHeaders.setString("B", "Value of B"); - Binding b = new Binding(null, getQueueName(), _queue, null, bindHeaders); + Binding b = new Binding(null, getQueueName(), _queue, _exchange, bindHeaders); assertTrue(new HeadersBinding(b).matches(matchHeaders)); } @@ -293,7 +300,7 @@ public class HeadersBindingTest extends TestCase matchHeaders.setString("B", "Value of B"); matchHeaders.setString("C", "Value of C"); - Binding b = new Binding(null, getQueueName(), _queue, null, bindHeaders); + Binding b = new Binding(null, getQueueName(), _queue, _exchange, bindHeaders); assertTrue(new HeadersBinding(b).matches(matchHeaders)); } @@ -307,7 +314,7 @@ public class HeadersBindingTest extends TestCase matchHeaders.setString("B", "Altered value of B"); matchHeaders.setString("C", "Value of C"); - Binding b = new Binding(null, getQueueName(), _queue, null, bindHeaders); + Binding b = new Binding(null, getQueueName(), _queue, _exchange, bindHeaders); assertTrue(new HeadersBinding(b).matches(matchHeaders)); } @@ -321,7 +328,7 @@ public class HeadersBindingTest extends TestCase matchHeaders.setString("B", "Altered value of B"); matchHeaders.setString("C", "Value of C"); - Binding b = new Binding(null, getQueueName(), _queue, null, bindHeaders); + Binding b = new Binding(null, getQueueName(), _queue, _exchange, bindHeaders); assertFalse(new HeadersBinding(b).matches(matchHeaders)); } diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java index 58ba6c9140..420b26a756 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java @@ -35,7 +35,6 @@ import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.message.AMQMessageHeader; import org.apache.qpid.server.message.InstanceProperties; import org.apache.qpid.server.message.ServerMessage; -import org.apache.qpid.server.model.*; import org.apache.qpid.server.model.Exchange; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.BaseQueue; @@ -179,7 +178,7 @@ public class HeadersExchangeTest extends TestCase routeAndTest(mockMessage(getArgsMapFromStrings("F0000=Aardvark")), q1, q2); routeAndTest(mockMessage(getArgsMapFromStrings("F0001")), q3); - _exchange.removeBinding("Q1",q1,getArgsMapFromStrings("F0000")); + _exchange.getBinding("Q1",q1).delete(); routeAndTest(mockMessage(getArgsMapFromStrings("F0000"))); routeAndTest(mockMessage(getArgsMapFromStrings("F0000=Aardvark")), q2); diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java index d5e7a92e8a..5a4db7a2bf 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java @@ -143,7 +143,7 @@ abstract class AbstractQueueTestBase<E extends QueueEntryImpl<E,Q,L>, Q extends assertEquals("Wrong exchange bound", _exchange, _queue.getBindings().get(0).getExchange()); - _exchange.removeBinding(_routingKey, _queue, Collections.EMPTY_MAP); + _exchange.getBinding(_routingKey, _queue).delete(); assertFalse("Routing key was still bound", _exchange.isBound(_routingKey)); diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java index 96d9ee429d..4cbe385fed 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java @@ -40,12 +40,15 @@ import org.apache.commons.configuration.Configuration; import org.apache.qpid.common.AMQPFilterTypes; import org.apache.qpid.server.binding.Binding; import org.apache.qpid.server.exchange.Exchange; +import org.apache.qpid.server.logging.LogActor; +import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.message.EnqueueableMessage; import org.apache.qpid.server.model.ExclusivityPolicy; import org.apache.qpid.server.model.LifetimePolicy; import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.model.VirtualHost; +import org.apache.qpid.server.plugin.ExchangeType; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.store.MessageStoreRecoveryHandler.StoredMessageRecoveryHandler; @@ -92,6 +95,7 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest _queueId = UUIDGenerator.generateRandomUUID(); _exchangeId = UUIDGenerator.generateRandomUUID(); + CurrentActor.set(mock(LogActor.class)); _storeName = getName(); _storePath = TMP_FOLDER + File.separator + _storeName; FileUtils.delete(new File(_storePath), true); @@ -112,6 +116,7 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest when(_exchange.getName()).thenReturn(EXCHANGE_NAME); when(_exchange.getId()).thenReturn(_exchangeId); + when(_exchange.getExchangeType()).thenReturn(mock(ExchangeType.class)); when(_configuration.getString(eq(MessageStoreConstants.ENVIRONMENT_PATH_PROPERTY), anyString())).thenReturn( _storePath); when(_virtualHost.getAttribute(eq(VirtualHost.STORE_PATH))).thenReturn(_storePath); diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java index 8caf4b3ab5..e70b34a426 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java @@ -27,6 +27,8 @@ import java.util.LinkedHashMap; import java.util.UUID; import org.apache.log4j.Logger; +import org.apache.qpid.server.binding.*; +import org.apache.qpid.server.binding.Binding; import org.apache.qpid.server.model.ExclusivityPolicy; import org.apache.qpid.server.model.LifetimePolicy; import org.apache.qpid.server.store.StoreException; @@ -989,7 +991,11 @@ public class ServerSessionDelegate extends SessionDelegate { try { - exchange.removeBinding(method.getBindingKey(), queue, null); + Binding binding = exchange.getBinding(method.getBindingKey(), queue); + if(binding != null) + { + binding.delete(); + } } catch (AccessControlException e) { diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueBindHandler.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueBindHandler.java index 78278b09c8..7ac71babf3 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueBindHandler.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueBindHandler.java @@ -121,7 +121,7 @@ public class QueueBindHandler implements StateAwareMethodListener<QueueBindBody> if(!exch.addBinding(bindingKey, queue, arguments) && TopicExchange.TYPE.equals(exch.getExchangeType())) { - Binding oldBinding = exch.getBinding(bindingKey, queue, arguments); + Binding oldBinding = exch.getBinding(bindingKey, queue); Map<String, Object> oldArgs = oldBinding.getArguments(); if((oldArgs == null && !arguments.isEmpty()) || (oldArgs != null && !oldArgs.equals(arguments))) diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueUnbindHandler.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueUnbindHandler.java index f44f831f68..f6dbd0cee0 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueUnbindHandler.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueUnbindHandler.java @@ -25,12 +25,12 @@ import org.apache.log4j.Logger; import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQMethodBody; import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.MethodRegistry; import org.apache.qpid.framing.QueueUnbindBody; import org.apache.qpid.framing.amqp_0_9.MethodRegistry_0_9; import org.apache.qpid.framing.amqp_0_91.MethodRegistry_0_91; import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.server.binding.Binding; import org.apache.qpid.server.protocol.v0_8.AMQChannel; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; @@ -100,7 +100,7 @@ public class QueueUnbindHandler implements StateAwareMethodListener<QueueUnbindB throw body.getChannelException(AMQConstant.NOT_FOUND, "Exchange " + body.getExchange() + " does not exist."); } - if(exch.getBinding(String.valueOf(routingKey), queue, FieldTable.convertToMap(body.getArguments())) == null) + if(exch.getBinding(String.valueOf(routingKey), queue) == null) { throw body.getChannelException(AMQConstant.NOT_FOUND,"No such binding"); } @@ -108,7 +108,11 @@ public class QueueUnbindHandler implements StateAwareMethodListener<QueueUnbindB { try { - exch.removeBinding(String.valueOf(routingKey), queue, FieldTable.convertToMap(body.getArguments())); + Binding binding = exch.getBinding(String.valueOf(routingKey), queue); + if(binding != null) + { + binding.delete(); + } } catch (AccessControlException e) { diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java index 5d9cd4b80a..be6eba3a72 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java @@ -232,7 +232,7 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS } for(Binding existingBinding : bindingsToRemove) { - existingBinding.getExchange().removeBinding(existingBinding); + existingBinding.delete(); } } diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java index c44216afac..eba5a6dd46 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java @@ -56,7 +56,6 @@ import org.apache.qpid.server.queue.StandardQueue; import org.apache.qpid.server.txn.AutoCommitTransaction; import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.util.BrokerTestHelper; -import org.apache.qpid.server.virtualhost.AbstractVirtualHost; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.test.utils.QpidTestCase; import org.apache.qpid.util.FileUtils; @@ -795,7 +794,8 @@ public class MessageStoreTest extends QpidTestCase try { - exchange.removeBinding(routingKey, queue, bindArguments); + Binding b = exchange.getBinding(routingKey, queue); + b.delete(); } catch (Exception e) { |
