From 178745059c1265f8cae71f8b19caf448b580afb0 Mon Sep 17 00:00:00 2001 From: Robert Godfrey Date: Fri, 18 Apr 2014 16:03:11 +0000 Subject: QPID-5710 : [Java Broker] Use common creation/recovery mechanism for Bindings git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1588501 13f79535-47bb-0310-9956-ffa450edef68 --- .../apache/qpid/server/binding/BindingFactory.java | 48 ++++++++++++++++++++++ .../apache/qpid/server/binding/BindingImpl.java | 18 +++++++- .../qpid/server/exchange/AbstractExchange.java | 32 +++++++-------- .../apache/qpid/server/exchange/ExchangeImpl.java | 5 +-- .../server/model/ConfiguredObjectFactoryImpl.java | 4 ++ .../qpid/server/virtualhost/BindingRecoverer.java | 28 +++++++++++-- ....qpid.server.plugin.ConfiguredObjectTypeFactory | 1 + .../DurableConfigurationRecovererTest.java | 32 +++++++++++++++ 8 files changed, 145 insertions(+), 23 deletions(-) create mode 100644 qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/BindingFactory.java (limited to 'qpid/java') diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/BindingFactory.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/BindingFactory.java new file mode 100644 index 0000000000..4050f7675e --- /dev/null +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/BindingFactory.java @@ -0,0 +1,48 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.binding; + +import java.util.Map; + +import org.apache.qpid.server.exchange.ExchangeImpl; +import org.apache.qpid.server.model.AbstractConfiguredObjectTypeFactory; +import org.apache.qpid.server.model.ConfiguredObject; +import org.apache.qpid.server.model.Exchange; +import org.apache.qpid.server.model.Queue; +import org.apache.qpid.server.queue.AMQQueue; + +public class BindingFactory extends AbstractConfiguredObjectTypeFactory +{ + public BindingFactory() + { + super(BindingImpl.class); + } + + @Override + protected BindingImpl createInstance(final Map attributes, final ConfiguredObject... parents) + { + ExchangeImpl exchange = (ExchangeImpl) getParent(Exchange.class, parents); + AMQQueue queue = (AMQQueue) getParent(Queue.class, parents); + BindingImpl binding = new BindingImpl(attributes, queue, exchange); + exchange.addBinding(binding); + return binding; + } +} diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/BindingImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/BindingImpl.java index 634b250d31..a533c0bc75 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/BindingImpl.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/BindingImpl.java @@ -40,6 +40,7 @@ import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.model.State; import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.store.DurableConfigurationStoreHelper; import org.apache.qpid.server.util.StateChangeListener; import org.apache.qpid.server.virtualhost.VirtualHostImpl; @@ -81,7 +82,11 @@ public class BindingImpl public BindingImpl(UUID id, Map attributes, AMQQueue queue, ExchangeImpl exchange) { - super(parentsMap(queue,exchange),enhanceWithDurable(combineIdWithAttributes(id, attributes), queue, exchange),queue.getVirtualHost().getTaskExecutor()); + this(enhanceWithDurable(combineIdWithAttributes(id,attributes), queue, exchange), queue, exchange); + } + public BindingImpl(Map attributes, AMQQueue queue, ExchangeImpl exchange) + { + super(parentsMap(queue,exchange),attributes,queue.getVirtualHost().getTaskExecutor()); _bindingKey = (String)attributes.get(org.apache.qpid.server.model.Binding.NAME); _queue = queue; _exchange = exchange; @@ -99,6 +104,17 @@ public class BindingImpl } + @Override + protected void onCreate() + { + super.onCreate(); + if (isDurable()) + { + DurableConfigurationStoreHelper.createBinding(_queue.getVirtualHost().getDurableConfigurationStore(), this); + } + + } + private static Map enhanceWithDurable(Map attributes, final AMQQueue queue, final ExchangeImpl exchange) diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java index b1dd6a3721..4f1643967e 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java @@ -626,12 +626,6 @@ public abstract class AbstractExchange> true); } - @Override - public void restoreBinding(final UUID id, final String bindingKey, final AMQQueue queue, - final Map argumentMap) - { - makeBinding(id, bindingKey,queue, argumentMap,true, false); - } private void removeBinding(final BindingImpl binding) { @@ -713,18 +707,10 @@ public abstract class AbstractExchange> if (existingMapping == null) { BindingImpl b = new BindingImpl(id, attributes, queue, this); - b.addStateChangeListener(_bindingListener); - b.open(); + b.create(); - if (b.isDurable() && !restore) - { - DurableConfigurationStoreHelper.createBinding(_virtualHost.getDurableConfigurationStore(), b); - } - _bindingsMap.put(bindingIdentifier, b); - queue.addBinding(b); - childAdded(b); + addBinding(b); - doAddBinding(b); return true; } @@ -742,6 +728,20 @@ public abstract class AbstractExchange> } } + @Override + public void addBinding(final BindingImpl b) + { + b.addStateChangeListener(_bindingListener); + + BindingIdentifier identifier = new BindingIdentifier(b.getName(), b.getAMQQueue()); + + _bindingsMap.put(identifier, b); + b.getAMQQueue().addBinding(b); + childAdded(b); + + doAddBinding(b); + } + protected abstract void onBindingUpdated(final BindingImpl binding, final Map oldArguments); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeImpl.java index 57929b7306..38913762d8 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeImpl.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeImpl.java @@ -59,9 +59,6 @@ public interface ExchangeImpl> extends Exchange, Ex AMQQueue queue, Map arguments); - void restoreBinding(UUID id, String bindingKey, AMQQueue queue, - Map argumentMap); - void delete(); /** @@ -114,6 +111,8 @@ public interface ExchangeImpl> extends Exchange, Ex EventLogger getEventLogger(); + void addBinding(BindingImpl binding); + public interface BindingListener { diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectFactoryImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectFactoryImpl.java index 57062cb7a2..54a23a0389 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectFactoryImpl.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectFactoryImpl.java @@ -63,6 +63,10 @@ public class ConfiguredObjectFactoryImpl implements ConfiguredObjectFactory { _defaultTypes.put(categoryName, annotation.defaultType()); } + else + { + _defaultTypes.put(categoryName, categoryName); + } } if(categoryFactories.put(factory.getType(),factory) != null) diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/BindingRecoverer.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/BindingRecoverer.java index 6e399d950e..526760aea7 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/BindingRecoverer.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/BindingRecoverer.java @@ -21,6 +21,7 @@ package org.apache.qpid.server.virtualhost; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.UUID; @@ -29,11 +30,16 @@ import org.apache.log4j.Logger; import org.apache.qpid.server.binding.BindingImpl; import org.apache.qpid.server.exchange.ExchangeImpl; +import org.apache.qpid.server.model.Binding; +import org.apache.qpid.server.model.Broker; +import org.apache.qpid.server.model.ConfiguredObjectFactory; import org.apache.qpid.server.model.Exchange; import org.apache.qpid.server.model.Queue; +import org.apache.qpid.server.plugin.ConfiguredObjectTypeFactory; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.store.AbstractDurableConfiguredObjectRecoverer; import org.apache.qpid.server.store.ConfiguredObjectRecord; +import org.apache.qpid.server.store.UnresolvedConfiguredObject; import org.apache.qpid.server.store.UnresolvedDependency; import org.apache.qpid.server.store.UnresolvedObject; @@ -41,11 +47,14 @@ public class BindingRecoverer extends AbstractDurableConfiguredObjectRecoverer _virtualHost; + private final ConfiguredObjectFactory _objectFactory; - public BindingRecoverer(final VirtualHostImpl virtualHost) + public BindingRecoverer(final VirtualHostImpl virtualHost) { _virtualHost = virtualHost; + Broker broker = _virtualHost.getParent(Broker.class); + _objectFactory = broker.getObjectFactory(); } @Override @@ -67,6 +76,7 @@ public class BindingRecoverer extends AbstractDurableConfiguredObjectRecoverer _unresolvedDependencies = new ArrayList(); @@ -76,6 +86,7 @@ public class BindingRecoverer extends AbstractDurableConfiguredObjectRecoverer) record.getAttributes().get(org.apache.qpid.server.model.Binding.ARGUMENTS); } @@ -108,7 +120,17 @@ public class BindingRecoverer extends AbstractDurableConfiguredObjectRecoverer attributesWithId = new HashMap(_record.getAttributes()); + attributesWithId.put(org.apache.qpid.server.model.Exchange.ID,_record.getId()); + attributesWithId.put(org.apache.qpid.server.model.Exchange.DURABLE,true); + + ConfiguredObjectTypeFactory configuredObjectTypeFactory = + _objectFactory.getConfiguredObjectTypeFactory(Binding.class, attributesWithId); + UnresolvedConfiguredObject unresolvedConfiguredObject = + configuredObjectTypeFactory.recover(_record, _exchange, _queue); + Binding binding = (Binding) unresolvedConfiguredObject.resolve(); + } return (_exchange).getBinding(_bindingName, _queue); } diff --git a/qpid/java/broker-core/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.ConfiguredObjectTypeFactory b/qpid/java/broker-core/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.ConfiguredObjectTypeFactory index cc000d49e6..3ec2b900e3 100644 --- a/qpid/java/broker-core/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.ConfiguredObjectTypeFactory +++ b/qpid/java/broker-core/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.ConfiguredObjectTypeFactory @@ -44,6 +44,7 @@ org.apache.qpid.server.exchange.DirectExchangeFactory org.apache.qpid.server.exchange.FanoutExchangeFactory org.apache.qpid.server.exchange.HeadersExchangeFactory org.apache.qpid.server.exchange.TopicExchangeFactory +org.apache.qpid.server.binding.BindingFactory diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java index 2822476b1c..49b2a61965 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java @@ -84,6 +84,7 @@ public class DurableConfigurationRecovererTest extends QpidTestCase private ConfiguredObjectFactory _configuredObjectFactory; private ConfiguredObjectTypeFactory _exchangeFactory; private ConfiguredObjectTypeFactory _queueFactory; + private ConfiguredObjectTypeFactory _bindingFactory; @Override public void setUp() throws Exception @@ -92,6 +93,8 @@ public class DurableConfigurationRecovererTest extends QpidTestCase _configuredObjectFactory = mock(ConfiguredObjectFactory.class); _exchangeFactory = mock(ConfiguredObjectTypeFactory.class); _queueFactory = mock(ConfiguredObjectTypeFactory.class); + _bindingFactory = mock(ConfiguredObjectTypeFactory.class); + AMQQueue queue = mock(AMQQueue.class); @@ -109,6 +112,8 @@ public class DurableConfigurationRecovererTest extends QpidTestCase when(_configuredObjectFactory.getConfiguredObjectTypeFactory(eq(Exchange.class), anyMap())).thenReturn(_exchangeFactory); when(_configuredObjectFactory.getConfiguredObjectTypeFactory(eq(Queue.class), anyMap())).thenReturn(_queueFactory); + when(_configuredObjectFactory.getConfiguredObjectTypeFactory(eq(Binding.class), anyMap())).thenReturn(_bindingFactory); + final ArgumentCaptor recoveredExchange = ArgumentCaptor.forClass(ConfiguredObjectRecord.class); @@ -169,6 +174,33 @@ public class DurableConfigurationRecovererTest extends QpidTestCase }).when(_queueFactory).recover(recoveredQueue.capture(), any(ConfiguredObject.class)); + final ArgumentCaptor recoveredBinding = ArgumentCaptor.forClass(ConfiguredObjectRecord.class); + final ArgumentCaptor parent1 = ArgumentCaptor.forClass(ConfiguredObject.class); + final ArgumentCaptor parent2 = ArgumentCaptor.forClass(ConfiguredObject.class); + + doAnswer(new Answer() + { + + @Override + public Object answer(final InvocationOnMock invocation) throws Throwable + { + ConfiguredObjectRecord queueRecord = recoveredBinding.getValue(); + Binding binding = mock(Binding.class); + UUID id = queueRecord.getId(); + String name = (String) queueRecord.getAttributes().get("name"); + when(binding.getId()).thenReturn(id); + when(binding.getName()).thenReturn(name); + + UnresolvedConfiguredObject unresolved = mock(UnresolvedConfiguredObject.class); + when(unresolved.resolve()).thenReturn(binding); + + + return unresolved; + } + }).when(_bindingFactory).recover(recoveredBinding.capture(), parent1.capture(), parent2.capture()); + + + DurableConfiguredObjectRecoverer[] recoverers = { new QueueRecoverer(_vhost), new ExchangeRecoverer(_vhost), -- cgit v1.2.1