summaryrefslogtreecommitdiff
path: root/qpid/java
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2014-04-18 13:37:32 +0000
committerRobert Godfrey <rgodfrey@apache.org>2014-04-18 13:37:32 +0000
commitef4adc559cc4b81a0c681807986c62fc0b9a13e4 (patch)
treea66bb7845d9074bb79253f11bd5636c9a30e5324 /qpid/java
parent36c6512134a729f2f7abb1fa6469a63b743dad1b (diff)
downloadqpid-python-ef4adc559cc4b81a0c681807986c62fc0b9a13e4.tar.gz
QPID-5710 : [Java Broker] Use common creation/recovery mechanism for Queues
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1588468 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/startup/UnresolvedObjectWithParents.java30
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/startup/UnresolvedParentDependency.java61
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/store/MemoryConfigurationEntryStore.java2
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java103
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObjectTypeFactory.java2
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectAttribute.java2
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectAttributeOrStatistic.java27
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Exchange.java4
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java26
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileBasedGroupProviderImpl.java9
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAliasAdapter.java4
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java45
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java273
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueue.java6
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueueFactory.java48
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueueImpl.java39
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueueList.java25
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java5
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueue.java6
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueFactory.java48
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueImpl.java46
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java19
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java7
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java20
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryListFactory.java26
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueFactory.java102
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java2
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java4
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryListFactory.java37
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueFactory.java48
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueImpl.java40
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueueEntryList.java9
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueueFactory.java48
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueueImpl.java17
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java4
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractDurableConfiguredObjectRecoverer.java90
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DependencyListener.java4
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationRecoverer.java84
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStoreHelper.java21
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/UnresolvedDependency.java1
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java169
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/BindingRecoverer.java12
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/ExchangeRecoverer.java4
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueExistsException.java5
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueRecoverer.java72
-rwxr-xr-xqpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java2
-rw-r--r--qpid/java/broker-core/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.ConfiguredObjectTypeFactory5
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java38
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/LastValueQueueListTest.java3
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueListTest.java3
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java21
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SelfValidatingSortedQueueEntryList.java4
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java18
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java34
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryTest.java16
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java24
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java23
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java9
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java78
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java6
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostQueueCreationTest.java27
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java27
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java4
-rw-r--r--qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/QueueMBean.java14
-rw-r--r--qpid/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/QueueMBeanTest.java9
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/server/store/PersistentStoreTest.java9
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java2
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/systest/management/jmx/QueueManagementTest.java54
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/GroupProviderRestTest.java2
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/VirtualHostRestTest.java16
70 files changed, 1101 insertions, 1003 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/startup/UnresolvedObjectWithParents.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/startup/UnresolvedObjectWithParents.java
deleted file mode 100644
index 4b2655e8c5..0000000000
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/startup/UnresolvedObjectWithParents.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.configuration.startup;
-
-import org.apache.qpid.server.model.ConfiguredObject;
-import org.apache.qpid.server.store.UnresolvedObject;
-
-public interface UnresolvedObjectWithParents<X> extends UnresolvedObject<X>
-{
- void resolvedParent(final UnresolvedParentDependency<?> unresolvedParentDependency,
- final ConfiguredObject<?> dependency);
-}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/startup/UnresolvedParentDependency.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/startup/UnresolvedParentDependency.java
deleted file mode 100644
index ae10106270..0000000000
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/startup/UnresolvedParentDependency.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.configuration.startup;
-
-import org.apache.qpid.server.model.ConfiguredObject;
-import org.apache.qpid.server.store.ConfiguredObjectRecord;
-import org.apache.qpid.server.store.UnresolvedDependency;
-
-import java.util.UUID;
-
-class UnresolvedParentDependency<X extends ConfiguredObject<X>> implements UnresolvedDependency<X>
-{
- private final UUID _id;
- private final String _type;
- private final UnresolvedObjectWithParents _unresolvedObject;
-
- public UnresolvedParentDependency(final UnresolvedObjectWithParents unresolvedObject,
- final String type,
- final ConfiguredObjectRecord record)
- {
- _type = type;
- _id = record.getId();
- _unresolvedObject = unresolvedObject;
- }
-
- @Override
- public UUID getId()
- {
- return _id;
- }
-
- @Override
- public String getType()
- {
- return _type;
- }
-
- @Override
- public void resolve(final X dependency)
- {
- _unresolvedObject.resolvedParent(this, dependency);
- }
-}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/store/MemoryConfigurationEntryStore.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/store/MemoryConfigurationEntryStore.java
index e6e4d0052b..b21ecd3811 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/store/MemoryConfigurationEntryStore.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/store/MemoryConfigurationEntryStore.java
@@ -784,7 +784,7 @@ public class MemoryConfigurationEntryStore implements ConfigurationEntryStore
UUID id = null;
if (idAsString == null)
{
- id = UUIDGenerator.generateRandomUUID();
+ id = UUID.randomUUID();
_generatedObjectIdDuringLoad = true;
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java
index a04635ec74..c2be3ee3cf 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java
@@ -194,6 +194,10 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
_id = uuid;
_name = AttributeValueConverter.STRING_CONVERTER.convert(attributes.get(NAME),this);
+ if(_name == null)
+ {
+ throw new IllegalArgumentException("The name attribute is mandatory for " + getClass().getSimpleName() + " creation.");
+ }
_attributeTypes = getAttributeTypes(getClass());
_automatedFields = getAutomatedFields(getClass());
@@ -267,6 +271,7 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
if(attr.getAnnotation().mandatory() && !(_attributes.containsKey(attr.getName())
|| !"".equals(attr.getAnnotation().defaultValue())))
{
+ deleted();
throw new IllegalArgumentException("Mandatory attribute " + attr.getName() + " not supplied for instance of " + getClass().getName());
}
}
@@ -367,9 +372,9 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
{
if(_open.compareAndSet(false,true))
{
- doResolution();
- doValidation();
- doOpening();
+ doResolution(true);
+ doValidation(true);
+ doOpening(true);
}
}
@@ -378,75 +383,87 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
{
if(_open.compareAndSet(false,true))
{
- doResolution();
- doValidation();
- doCreation();
- doOpening();
+ doResolution(true);
+ doValidation(true);
+ doCreation(true);
+ doOpening(true);
}
}
- protected void doOpening()
+ protected void doOpening(final boolean skipCheck)
{
- onOpen();
- applyToChildren(new Action<ConfiguredObject<?>>()
+ if(skipCheck || _open.compareAndSet(false,true))
{
- @Override
- public void performAction(final ConfiguredObject<?> child)
+ onOpen();
+ applyToChildren(new Action<ConfiguredObject<?>>()
{
- if(child instanceof AbstractConfiguredObject)
+ @Override
+ public void performAction(final ConfiguredObject<?> child)
{
- ((AbstractConfiguredObject)child).doOpening();
+ if (child instanceof AbstractConfiguredObject)
+ {
+ ((AbstractConfiguredObject) child).doOpening(false);
+ }
}
- }
- });
+ });
+ }
}
- protected final void doValidation()
+ protected final void doValidation(final boolean skipCheck)
{
- applyToChildren(new Action<ConfiguredObject<?>>()
+ if(skipCheck || !_open.get())
{
- @Override
- public void performAction(final ConfiguredObject<?> child)
+ applyToChildren(new Action<ConfiguredObject<?>>()
{
- if(child instanceof AbstractConfiguredObject)
+ @Override
+ public void performAction(final ConfiguredObject<?> child)
{
- ((AbstractConfiguredObject)child).doValidation();
+ if (child instanceof AbstractConfiguredObject)
+ {
+ ((AbstractConfiguredObject) child).doValidation(false);
+ }
}
- }
- });
- validate();
+ });
+ validate();
+ }
}
- protected final void doResolution()
+ protected final void doResolution(final boolean skipCheck)
{
- resolve();
- applyToChildren(new Action<ConfiguredObject<?>>()
+ if(skipCheck || !_open.get())
{
- @Override
- public void performAction(final ConfiguredObject<?> child)
+ resolve();
+ applyToChildren(new Action<ConfiguredObject<?>>()
{
- if(child instanceof AbstractConfiguredObject)
+ @Override
+ public void performAction(final ConfiguredObject<?> child)
{
- ((AbstractConfiguredObject)child).doResolution();
+ if (child instanceof AbstractConfiguredObject)
+ {
+ ((AbstractConfiguredObject) child).doResolution(false);
+ }
}
- }
- });
+ });
+ }
}
- protected final void doCreation()
+ protected final void doCreation(final boolean skipCheck)
{
- onCreate();
- applyToChildren(new Action<ConfiguredObject<?>>()
+ if(skipCheck || !_open.get())
{
- @Override
- public void performAction(final ConfiguredObject<?> child)
+ onCreate();
+ applyToChildren(new Action<ConfiguredObject<?>>()
{
- if(child instanceof AbstractConfiguredObject)
+ @Override
+ public void performAction(final ConfiguredObject<?> child)
{
- ((AbstractConfiguredObject)child).doCreation();
+ if (child instanceof AbstractConfiguredObject)
+ {
+ ((AbstractConfiguredObject) child).doCreation(false);
+ }
}
- }
- });
+ });
+ }
}
private void applyToChildren(Action<ConfiguredObject<?>> action)
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObjectTypeFactory.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObjectTypeFactory.java
index a965044c90..38f885717f 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObjectTypeFactory.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObjectTypeFactory.java
@@ -32,7 +32,7 @@ abstract public class AbstractConfiguredObjectTypeFactory<X extends AbstractConf
{
private final Class<X> _clazz;
- protected AbstractConfiguredObjectTypeFactory(final Class<X> clazz)
+ public AbstractConfiguredObjectTypeFactory(final Class<X> clazz)
{
_clazz = clazz;
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectAttribute.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectAttribute.java
index 5c4ed71902..e7fb50be87 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectAttribute.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectAttribute.java
@@ -46,6 +46,6 @@ public final class ConfiguredObjectAttribute<C extends ConfiguredObject, T> exte
public T convert(final Object value, C object)
{
- return _converter.convert(value, object);
+ return getConverter().convert(value, object);
}
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectAttributeOrStatistic.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectAttributeOrStatistic.java
index da1edd7da2..2e1350981c 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectAttributeOrStatistic.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectAttributeOrStatistic.java
@@ -26,18 +26,18 @@ import java.lang.reflect.Method;
abstract class ConfiguredObjectAttributeOrStatistic<C extends ConfiguredObject, T>
{
- protected final String _name;
- protected final Class<T> _type;
- protected final AttributeValueConverter<T> _converter;
- protected final Method _getter;
+ private final String _name;
+ private final Class<T> _type;
+ private final AttributeValueConverter<T> _converter;
+ private final Method _getter;
ConfiguredObjectAttributeOrStatistic(final Method getter)
{
_getter = getter;
_type = (Class<T>) getTypeFromMethod(getter);
- _name = getNameFromMethod(getter, _type);
- _converter = AttributeValueConverter.getConverter(_type, getter.getGenericReturnType());
+ _name = getNameFromMethod(getter, getType());
+ _converter = AttributeValueConverter.getConverter(getType(), getter.getGenericReturnType());
}
@@ -133,17 +133,17 @@ abstract class ConfiguredObjectAttributeOrStatistic<C extends ConfiguredObject,
{
try
{
- return (T) _getter.invoke(configuredObject);
+ return (T) getGetter().invoke(configuredObject);
}
catch (IllegalAccessException e)
{
- Object o = configuredObject.getAttribute(_name);
- return _converter.convert(o, configuredObject);
+ Object o = configuredObject.getAttribute(getName());
+ return getConverter().convert(o, configuredObject);
}
catch (InvocationTargetException e)
{
- Object o = configuredObject.getAttribute(_name);
- return _converter.convert(o, configuredObject);
+ Object o = configuredObject.getAttribute(getName());
+ return getConverter().convert(o, configuredObject);
}
}
@@ -152,4 +152,9 @@ abstract class ConfiguredObjectAttributeOrStatistic<C extends ConfiguredObject,
{
return _getter;
}
+
+ public AttributeValueConverter<T> getConverter()
+ {
+ return _converter;
+ }
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Exchange.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Exchange.java
index e7b5dd971d..551f27aee2 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Exchange.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Exchange.java
@@ -23,8 +23,10 @@ package org.apache.qpid.server.model;
import java.util.Collection;
import java.util.Map;
+import org.apache.qpid.server.message.MessageDestination;
+
@ManagedObject
-public interface Exchange<X extends Exchange<X>> extends ConfiguredObject<X>
+public interface Exchange<X extends Exchange<X>> extends ConfiguredObject<X>, MessageDestination
{
String STATE = "state";
String ALTERNATE_EXCHANGE = "alternateExchange";
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
index fed8862782..fb9548c429 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
@@ -45,27 +45,35 @@ public interface Queue<X extends Queue<X>> extends ConfiguredObject<X>
String QUEUE_FLOW_RESUME_SIZE_BYTES = "queueFlowResumeSizeBytes";
String QUEUE_FLOW_STOPPED = "queueFlowStopped";
- String CREATE_DLQ_ON_CREATION = "x-qpid-dlq-enabled"; // TODO - this value should change
-
- @ManagedAttribute
+ @ManagedAttribute( automate = true )
Exchange getAlternateExchange();
@ManagedAttribute( automate = true, defaultValue = "NONE" )
ExclusivityPolicy getExclusive();
- @ManagedAttribute
+ @ManagedAttribute( derived = true )
String getOwner();
- @ManagedAttribute
- boolean getNoLocal();
+ @ManagedAttribute( automate = true )
+ boolean isNoLocal();
- @ManagedAttribute
+ @ManagedAttribute( automate = true )
String getMessageGroupKey();
+ @ManagedContextDefault( name = "qpid.broker_default-shared-message-group")
+ String DEFAULT_SHARED_MESSAGE_GROUP = "qpid.no-group";
+
+ @ManagedAttribute( automate = true, defaultValue = "${qpid.broker_default-shared-message-group}")
+ String getMessageGroupDefaultGroup();
+
+ @ManagedContextDefault( name = "queue.maximumDistinctGroups")
+ int DEFAULT_MAXIMUM_DISTINCT_GROUPS = 255;
+
+ @ManagedAttribute( automate = true, defaultValue = "${queue.maximumDistinctGroups}")
+ int getMaximumDistinctGroups();
- // TODO - this should either be a boolean or maybe an enum
- @ManagedAttribute
+ @ManagedAttribute( automate = true )
boolean isMessageGroupSharedGroups();
@ManagedContextDefault( name = "queue.maximumDeliveryAttempts")
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileBasedGroupProviderImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileBasedGroupProviderImpl.java
index be14b284b1..ae2c412077 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileBasedGroupProviderImpl.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileBasedGroupProviderImpl.java
@@ -46,7 +46,6 @@ import org.apache.qpid.server.model.GroupProvider;
import org.apache.qpid.server.model.IllegalStateTransitionException;
import org.apache.qpid.server.model.ManagedAttributeField;
import org.apache.qpid.server.model.State;
-import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.security.access.Operation;
import org.apache.qpid.server.security.auth.UsernamePrincipal;
@@ -139,7 +138,7 @@ public class FileBasedGroupProviderImpl
for (Principal group : groups)
{
Map<String,Object> attrMap = new HashMap<String, Object>();
- UUID id = UUIDGenerator.generateGroupUUID(getName(),group.getName());
+ UUID id = UUID.randomUUID();
attrMap.put(Group.ID, id);
attrMap.put(Group.NAME, group.getName());
GroupAdapter groupAdapter = new GroupAdapter(attrMap, getTaskExecutor());
@@ -228,7 +227,7 @@ public class FileBasedGroupProviderImpl
_groupDatabase.createGroup(groupName);
Map<String,Object> attrMap = new HashMap<String, Object>();
- UUID id = UUIDGenerator.generateGroupUUID(getName(),groupName);
+ UUID id = UUID.randomUUID();
attrMap.put(Group.ID, id);
attrMap.put(Group.NAME, groupName);
GroupAdapter groupAdapter = new GroupAdapter(attrMap, getTaskExecutor());
@@ -442,7 +441,7 @@ public class FileBasedGroupProviderImpl
Collection<GroupMember> members = new ArrayList<GroupMember>();
for (Principal principal : usersInGroup)
{
- UUID id = UUIDGenerator.generateGroupMemberUUID(FileBasedGroupProviderImpl.this.getName(), getName(), principal.getName());
+ UUID id = UUID.randomUUID();
Map<String,Object> attrMap = new HashMap<String, Object>();
attrMap.put(GroupMember.ID,id);
attrMap.put(GroupMember.NAME, principal.getName());
@@ -493,7 +492,7 @@ public class FileBasedGroupProviderImpl
getSecurityManager().authoriseGroupOperation(Operation.UPDATE, getName());
_groupDatabase.addUserToGroup(memberName, getName());
- UUID id = UUIDGenerator.generateGroupMemberUUID(FileBasedGroupProviderImpl.this.getName(), getName(), memberName);
+ UUID id = UUID.randomUUID();
Map<String,Object> attrMap = new HashMap<String, Object>();
attrMap.put(GroupMember.ID,id);
attrMap.put(GroupMember.NAME, memberName);
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAliasAdapter.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAliasAdapter.java
index 9a01cd6aeb..0b1409c5b9 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAliasAdapter.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAliasAdapter.java
@@ -25,13 +25,13 @@ import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
+import java.util.UUID;
import org.apache.qpid.server.model.AbstractConfiguredObject;
import org.apache.qpid.server.model.AuthenticationMethod;
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.Port;
import org.apache.qpid.server.model.State;
-import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.model.VirtualHostAlias;
@@ -52,7 +52,7 @@ public class VirtualHostAliasAdapter extends AbstractConfiguredObject<VirtualHos
private static Map<String, Object> createAttributes(final VirtualHost virtualHost, final Port port)
{
final Map<String, Object> attributes = new HashMap<String, Object>();
- attributes.put(ID, UUIDGenerator.generateVhostAliasUUID(virtualHost.getName(), port.getName()));
+ attributes.put(ID, UUID.randomUUID());
attributes.put(NAME, virtualHost.getName());
return attributes;
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
index 6a2a6449eb..e31eac77b4 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
@@ -25,12 +25,10 @@ import java.util.List;
import java.util.Set;
import org.apache.qpid.server.binding.BindingImpl;
-import org.apache.qpid.server.exchange.ExchangeImpl;
import org.apache.qpid.server.exchange.ExchangeReferrer;
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.message.MessageDestination;
import org.apache.qpid.server.message.MessageSource;
-import org.apache.qpid.server.model.LifetimePolicy;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.model.QueueNotificationListener;
import org.apache.qpid.server.protocol.CapacityChecker;
@@ -60,10 +58,6 @@ public interface AMQQueue<X extends AMQQueue<X>>
long getTotalEnqueuedMessages();
- LifetimePolicy getLifetimePolicy();
-
- String getOwner();
-
VirtualHostImpl getVirtualHost();
public Collection<QueueConsumer<?>> getConsumers();
@@ -118,41 +112,6 @@ public interface AMQQueue<X extends AMQQueue<X>>
void visit(QueueEntryVisitor visitor);
-
- long getAlertThresholdMessageSize();
-
- void setAlertThresholdMessageSize(long value);
-
-
- long getAlertThresholdQueueDepthMessages();
-
- void setAlertThresholdQueueDepthMessages(long value);
-
-
- long getAlertThresholdQueueDepthBytes();
-
- void setAlertThresholdQueueDepthBytes(long value);
-
-
- long getAlertThresholdMessageAge();
-
- void setAlertThresholdMessageAge(final long maximumMessageAge);
-
-
- long getAlertRepeatGap();
-
- void setAlertRepeatGap(long value);
-
-
- long getQueueFlowControlSizeBytes();
-
- void setQueueFlowControlSizeBytes(long capacity);
-
-
- long getQueueFlowResumeSizeBytes();
-
- void setQueueFlowResumeSizeBytes(long flowResumeCapacity);
-
boolean isOverfull();
long clearQueue();
@@ -168,10 +127,6 @@ public interface AMQQueue<X extends AMQQueue<X>>
void stop();
- ExchangeImpl getAlternateExchange();
-
- void setAlternateExchange(ExchangeImpl exchange);
-
Collection<String> getAvailableAttributes();
Object getAttribute(String attrName);
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
index f6a667e612..faa44b2288 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
@@ -18,7 +18,6 @@
*/
package org.apache.qpid.server.queue;
-import java.lang.reflect.Type;
import java.security.AccessControlException;
import java.security.AccessController;
import java.security.Principal;
@@ -27,7 +26,6 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
-import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -46,7 +44,6 @@ import org.apache.log4j.Logger;
import org.apache.qpid.pool.ReferenceCountingExecutorService;
import org.apache.qpid.server.binding.BindingImpl;
-import org.apache.qpid.server.configuration.BrokerProperties;
import org.apache.qpid.server.configuration.IllegalConfigurationException;
import org.apache.qpid.server.connection.SessionPrincipal;
import org.apache.qpid.server.consumer.ConsumerImpl;
@@ -54,6 +51,7 @@ import org.apache.qpid.server.consumer.ConsumerTarget;
import org.apache.qpid.server.exchange.ExchangeImpl;
import org.apache.qpid.server.filter.FilterManager;
import org.apache.qpid.server.logging.EventLogger;
+import org.apache.qpid.server.logging.LogMessage;
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.logging.messages.QueueMessages;
import org.apache.qpid.server.logging.subjects.QueueLogSubject;
@@ -100,10 +98,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
public static final String SHARED_MSG_GROUP_ARG_VALUE = "1";
private static final String QPID_NO_GROUP = "qpid.no-group";
- private static final String DEFAULT_SHARED_MESSAGE_GROUP = System.getProperty(BrokerProperties.PROPERTY_DEFAULT_SHARED_MESSAGE_GROUP, QPID_NO_GROUP);
- // TODO - should make this configurable at the vhost / broker level
- private static final int DEFAULT_MAX_GROUPS = 255;
private static final QueueNotificationListener NULL_NOTIFICATION_LISTENER = new QueueNotificationListener()
{
@Override
@@ -118,11 +113,10 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
private final VirtualHostImpl _virtualHost;
private final DeletedChildListener _deletedChildListener = new DeletedChildListener();
- private ExchangeImpl _alternateExchange;
+ @ManagedAttributeField( beforeSet = "preSetAlternateExchange", afterSet = "postSetAlternateExchange")
+ private Exchange _alternateExchange;
- private final QueueEntryList _entries;
-
private final QueueConsumerList _consumerList = new QueueConsumerList();
private volatile QueueConsumer<?> _exclusiveSubscriber;
@@ -203,6 +197,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
private LogSubject _logSubject;
+ @ManagedAttributeField
private boolean _noLocal;
private final AtomicBoolean _overfull = new AtomicBoolean(false);
@@ -224,19 +219,37 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
private QueueNotificationListener _notificationListener;
private final long[] _lastNotificationTimes = new long[NotificationCheck.values().length];
+ @ManagedAttributeField
+ private String _messageGroupKey;
+ @ManagedAttributeField
+ private boolean _messageGroupSharedGroups;
+ @ManagedAttributeField
+ private String _messageGroupDefaultGroup;
+ @ManagedAttributeField
+ private int _maximumDistinctGroups;
+
protected AbstractQueue(VirtualHostImpl virtualHost,
- Map<String, Object> attributes,
- QueueEntryListFactory entryListFactory)
+ Map<String, Object> attributes)
{
super(parentsMap(virtualHost),
attributes, virtualHost.getTaskExecutor());
- _entries = entryListFactory.createQueueEntryList(this);
_virtualHost = virtualHost;
_asyncDelivery = ReferenceCountingExecutorService.getInstance().acquireExecutorService();
+ }
+
+ @Override
+ protected void onCreate()
+ {
+ super.onCreate();
+ if (isDurable() && !(getLifetimePolicy() == LifetimePolicy.DELETE_ON_CONNECTION_CLOSE
+ || getLifetimePolicy() == LifetimePolicy.DELETE_ON_SESSION_END))
+ {
+ DurableConfigurationStoreHelper.createQueue(_virtualHost.getDurableConfigurationStore(), this);
+ }
}
public void validate()
@@ -308,9 +321,6 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
_arguments = Collections.synchronizedMap(arguments);
- _noLocal = MapValueConverter.getBooleanAttribute(Queue.NO_LOCAL, attributes, false);
-
-
_logSubject = new QueueLogSubject(this);
try
@@ -423,29 +433,18 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
// Log the creation of this Queue.
// The priorities display is toggled on if we set priorities > 0
getEventLogger().message(_logSubject,
- QueueMessages.CREATED(ownerString,
- _entries.getPriorities(),
- ownerString != null,
- getLifetimePolicy() != LifetimePolicy.PERMANENT,
- isDurable(),
- !isDurable(),
- _entries.getPriorities() > 0));
+ getCreatedLogMessage());
- if(attributes != null && attributes.containsKey(Queue.MESSAGE_GROUP_KEY))
+ if(getMessageGroupKey() != null)
{
- if(attributes.get(Queue.MESSAGE_GROUP_SHARED_GROUPS) != null
- && (Boolean)(attributes.get(Queue.MESSAGE_GROUP_SHARED_GROUPS)))
+ if(isMessageGroupSharedGroups())
{
- Object defaultGroup = attributes.get(Queue.MESSAGE_GROUP_DEFAULT_GROUP);
_messageGroupManager =
- new DefinedGroupMessageGroupManager(String.valueOf(attributes.get(Queue.MESSAGE_GROUP_KEY)),
- defaultGroup == null ? DEFAULT_SHARED_MESSAGE_GROUP : defaultGroup.toString(),
- this);
+ new DefinedGroupMessageGroupManager(getMessageGroupKey(), getMessageGroupDefaultGroup(), this);
}
else
{
- _messageGroupManager = new AssignedConsumerMessageGroupManager(String.valueOf(attributes.get(
- Queue.MESSAGE_GROUP_KEY)), DEFAULT_MAX_GROUPS);
+ _messageGroupManager = new AssignedConsumerMessageGroupManager(getMessageGroupKey(), getMaximumDistinctGroups());
}
}
else
@@ -456,6 +455,18 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
updateAlertChecks();
}
+ protected LogMessage getCreatedLogMessage()
+ {
+ String ownerString = getOwner();
+ return QueueMessages.CREATED(ownerString,
+ 0,
+ ownerString != null,
+ getLifetimePolicy() != LifetimePolicy.PERMANENT,
+ isDurable(),
+ !isDurable(),
+ false);
+ }
+
private void addLifetimeConstraint(final Deletable<? extends Deletable> lifetimeObject)
{
final Action<Deletable> deleteQueueTask = new Action<Deletable>()
@@ -501,32 +512,35 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
}
}
- public void setNoLocal(boolean nolocal)
- {
- _noLocal = nolocal;
- }
-
public boolean isExclusive()
{
return _exclusive != ExclusivityPolicy.NONE;
}
- public ExchangeImpl getAlternateExchange()
+ public Exchange<?> getAlternateExchange()
{
return _alternateExchange;
}
public void setAlternateExchange(ExchangeImpl exchange)
{
- if(_alternateExchange != null)
+ _alternateExchange = exchange;
+ }
+
+ private void postSetAlternateExchange()
+ {
+ if(_alternateExchange instanceof ExchangeImpl)
{
- _alternateExchange.removeReference(this);
+ ((ExchangeImpl)_alternateExchange).addReference(this);
}
- if(exchange != null)
+ }
+
+ private void preSetAlternateExchange()
+ {
+ if(_alternateExchange instanceof ExchangeImpl)
{
- exchange.addReference(this);
+ ((ExchangeImpl)_alternateExchange).removeReference(this);
}
- _alternateExchange = exchange;
}
@@ -712,7 +726,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
}
consumer.setStateListener(this);
- consumer.setQueueContext(new QueueContext(_entries.getHead()));
+ consumer.setQueueContext(new QueueContext(getEntries().getHead()));
if (!isDeleted())
{
@@ -908,7 +922,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
final QueueConsumer<?> exclusiveSub = _exclusiveSubscriber;
- final QueueEntry entry = _entries.add(message);
+ final QueueEntry entry = getEntries().add(message);
if(action != null || (exclusiveSub == null && _queueRunner.isIdle()))
{
@@ -1271,7 +1285,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
protected QueueEntry getOldestQueueEntry()
{
- return _entries.next(_entries.getHead());
+ return getEntries().next(getEntries().getHead());
}
public boolean isDeleted()
@@ -1282,7 +1296,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
public List<QueueEntry> getMessagesOnTheQueue()
{
ArrayList<QueueEntry> entryList = new ArrayList<QueueEntry>();
- QueueEntryIterator queueListIterator = _entries.iterator();
+ QueueEntryIterator queueListIterator = getEntries().iterator();
while (queueListIterator.advance())
{
QueueEntry node = queueListIterator.getNode();
@@ -1344,10 +1358,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
}
/** Used to track bindings to exchanges so that on deletion they can easily be cancelled. */
- protected QueueEntryList getEntries()
- {
- return _entries;
- }
+ abstract QueueEntryList getEntries();
protected QueueConsumerList getConsumerList()
{
@@ -1410,7 +1421,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
public List<QueueEntry> getMessagesOnTheQueue(QueueEntryFilter filter)
{
ArrayList<QueueEntry> entryList = new ArrayList<QueueEntry>();
- QueueEntryIterator queueListIterator = _entries.iterator();
+ QueueEntryIterator queueListIterator = getEntries().iterator();
while (queueListIterator.advance() && !filter.filterComplete())
{
QueueEntry node = queueListIterator.getNode();
@@ -1425,7 +1436,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
public void visit(final QueueEntryVisitor visitor)
{
- QueueEntryIterator queueListIterator = _entries.iterator();
+ QueueEntryIterator queueListIterator = getEntries().iterator();
while(queueListIterator.advance())
{
@@ -1487,7 +1498,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
//Perform ACLs
getVirtualHost().getSecurityManager().authorisePurge(this);
- QueueEntryIterator queueListIterator = _entries.iterator();
+ QueueEntryIterator queueListIterator = getEntries().iterator();
long count = 0;
ServerTransaction txn = new LocalTransaction(getVirtualHost().getMessageStore());
@@ -1605,10 +1616,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
txn.commit();
- if(_alternateExchange != null)
- {
- _alternateExchange.removeReference(this);
- }
+ preSetAlternateExchange();
for (Action<? super AMQQueue> task : _deleteTaskList)
@@ -1834,7 +1842,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
}
}
- atTail = (node == null) || (_entries.next(node) == null);
+ atTail = (node == null) || (getEntries().next(node) == null);
}
return atTail || !subActive;
}
@@ -1865,7 +1873,8 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
QueueEntry lastSeen = context.getLastSeenEntry();
QueueEntry releasedNode = context.getReleasedEntry();
- QueueEntry node = (releasedNode != null && lastSeen.compareTo(releasedNode)>=0) ? releasedNode : _entries.next(lastSeen);
+ QueueEntry node = (releasedNode != null && lastSeen.compareTo(releasedNode)>=0) ? releasedNode : getEntries()
+ .next(lastSeen);
boolean expired = false;
while (node != null && (!node.isAvailable() || (expired = node.expired()) || !sub.hasInterest(node) ||
@@ -1887,7 +1896,8 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
lastSeen = context.getLastSeenEntry();
releasedNode = context.getReleasedEntry();
- node = (releasedNode != null && lastSeen.compareTo(releasedNode)>0) ? releasedNode : _entries.next(lastSeen);
+ node = (releasedNode != null && lastSeen.compareTo(releasedNode)>0) ? releasedNode : getEntries().next(
+ lastSeen);
}
return node;
}
@@ -2059,7 +2069,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
public void checkMessageStatus()
{
- QueueEntryIterator queueListIterator = _entries.iterator();
+ QueueEntryIterator queueListIterator = getEntries().iterator();
while (queueListIterator.advance())
{
@@ -2098,22 +2108,11 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
return _alertRepeatGap;
}
- public void setAlertRepeatGap(long alertRepeatGap)
- {
- _alertRepeatGap = alertRepeatGap;
- }
-
public long getAlertThresholdMessageAge()
{
return _alertThresholdMessageAge;
}
- public void setAlertThresholdMessageAge(long alertThresholdMessageAge)
- {
- _alertThresholdMessageAge = alertThresholdMessageAge;
- updateNotificationCheck(alertThresholdMessageAge, NotificationCheck.MESSAGE_AGE_ALERT);
- }
-
public long getAlertThresholdQueueDepthMessages()
{
return _alertThresholdQueueDepthMessages;
@@ -2139,59 +2138,26 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
}
}
- public void setAlertThresholdQueueDepthMessages(final long alertThresholdQueueDepthMessages)
- {
- _alertThresholdQueueDepthMessages = alertThresholdQueueDepthMessages;
- updateNotificationCheck(alertThresholdQueueDepthMessages, NotificationCheck.MESSAGE_COUNT_ALERT);
-
- }
-
public long getAlertThresholdQueueDepthBytes()
{
return _alertThresholdQueueDepthBytes;
}
- // Sets the queue depth, the max queue size
- public void setAlertThresholdQueueDepthBytes(final long alertThresholdQueueDepthBytes)
- {
- _alertThresholdQueueDepthBytes = alertThresholdQueueDepthBytes;
- updateNotificationCheck(alertThresholdQueueDepthBytes, NotificationCheck.QUEUE_DEPTH_ALERT);
-
- }
-
public long getAlertThresholdMessageSize()
{
return _alertThresholdMessageSize;
}
- public void setAlertThresholdMessageSize(final long alertThresholdMessageSize)
- {
- _alertThresholdMessageSize = alertThresholdMessageSize;
- updateNotificationCheck(alertThresholdMessageSize, NotificationCheck.MESSAGE_SIZE_ALERT);
- }
-
public long getQueueFlowControlSizeBytes()
{
return _queueFlowControlSizeBytes;
}
- public void setQueueFlowControlSizeBytes(long queueFlowControlSizeBytes)
- {
- _queueFlowControlSizeBytes = queueFlowControlSizeBytes;
- }
-
public long getQueueFlowResumeSizeBytes()
{
return _queueFlowResumeSizeBytes;
}
- public void setQueueFlowResumeSizeBytes(long queueFlowResumeSizeBytes)
- {
- _queueFlowResumeSizeBytes = queueFlowResumeSizeBytes;
-
- checkCapacity();
- }
-
public boolean isOverfull()
{
return _overfull.get();
@@ -2258,7 +2224,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
public List<Long> getMessagesOnTheQueue(int num, int offset)
{
ArrayList<Long> ids = new ArrayList<Long>(num);
- QueueEntryIterator it = _entries.iterator();
+ QueueEntryIterator it = getEntries().iterator();
for (int i = 0; i < offset; i++)
{
it.advance();
@@ -2657,7 +2623,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
}
@Override
- public boolean getNoLocal()
+ public boolean isNoLocal()
{
return _noLocal;
}
@@ -2665,15 +2631,26 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
@Override
public String getMessageGroupKey()
{
- return (String) getAttribute(MESSAGE_GROUP_KEY);
+ return _messageGroupKey;
}
@Override
public boolean isMessageGroupSharedGroups()
{
- return (Boolean) getAttribute(MESSAGE_GROUP_SHARED_GROUPS);
+ return _messageGroupSharedGroups;
}
+ @Override
+ public String getMessageGroupDefaultGroup()
+ {
+ return _messageGroupDefaultGroup;
+ }
+
+ @Override
+ public int getMaximumDistinctGroups()
+ {
+ return _maximumDistinctGroups;
+ }
@Override
public boolean isQueueFlowStopped()
@@ -2728,14 +2705,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
{
try
{
- if(ALTERNATE_EXCHANGE.equals(name))
- {
- // In future we may want to accept a UUID as an alternative way to identifying the exchange
- ExchangeImpl alternateExchange = (ExchangeImpl) desired;
- setAlternateExchange(alternateExchange);
- return true;
- }
- else if(EXCLUSIVE.equals(name))
+ if(EXCLUSIVE.equals(name))
{
ExclusivityPolicy existingPolicy = getExclusive();
if(super.changeAttribute(name, expected, desired))
@@ -2776,54 +2746,41 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
_virtualHost.getSecurityManager().authoriseUpdate(this);
}
- @SuppressWarnings("serial")
- static final Map<String, Type> ATTRIBUTE_TYPES = Collections.unmodifiableMap(new HashMap<String, Type>(){{
- put(ALERT_REPEAT_GAP, Long.class);
- put(ALERT_THRESHOLD_MESSAGE_AGE, Long.class);
- put(ALERT_THRESHOLD_MESSAGE_SIZE, Long.class);
- put(ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES, Long.class);
- put(ALERT_THRESHOLD_QUEUE_DEPTH_BYTES, Long.class);
- put(QUEUE_FLOW_CONTROL_SIZE_BYTES, Long.class);
- put(QUEUE_FLOW_RESUME_SIZE_BYTES, Long.class);
- put(MAXIMUM_DELIVERY_ATTEMPTS, Integer.class);
- put(DESCRIPTION, String.class);
- }});
+ private static final String[] NON_NEGATIVE_NUMBERS = {
+ ALERT_REPEAT_GAP,
+ ALERT_THRESHOLD_MESSAGE_AGE,
+ ALERT_THRESHOLD_MESSAGE_SIZE,
+ ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES,
+ ALERT_THRESHOLD_QUEUE_DEPTH_BYTES,
+ QUEUE_FLOW_CONTROL_SIZE_BYTES,
+ QUEUE_FLOW_RESUME_SIZE_BYTES,
+ MAXIMUM_DELIVERY_ATTEMPTS
+ };
@Override
- protected void changeAttributes(final Map<String, Object> attributes)
+ protected void validateChange(final ConfiguredObject<?> proxyForValidation, final Set<String> changedAttributes)
{
- Map<String, Object> convertedAttributes = MapValueConverter.convert(attributes, ATTRIBUTE_TYPES);
- validateAttributes(convertedAttributes);
-
- super.changeAttributes(convertedAttributes);
- }
-
- private void validateAttributes(Map<String, Object> convertedAttributes)
- {
- Long queueFlowControlSize = (Long) convertedAttributes.get(QUEUE_FLOW_CONTROL_SIZE_BYTES);
- Long queueFlowControlResumeSize = (Long) convertedAttributes.get(QUEUE_FLOW_RESUME_SIZE_BYTES);
- if (queueFlowControlSize != null || queueFlowControlResumeSize != null )
+ super.validateChange(proxyForValidation, changedAttributes);
+ Queue<?> queue = (Queue) proxyForValidation;
+ long queueFlowControlSize = queue.getQueueFlowControlSizeBytes();
+ long queueFlowControlResumeSize = queue.getQueueFlowResumeSizeBytes();
+ if (queueFlowControlResumeSize > queueFlowControlSize)
{
- if (queueFlowControlSize == null)
- {
- queueFlowControlSize = (Long)getAttribute(QUEUE_FLOW_CONTROL_SIZE_BYTES);
- }
- if (queueFlowControlResumeSize == null)
- {
- queueFlowControlResumeSize = (Long)getAttribute(QUEUE_FLOW_RESUME_SIZE_BYTES);
- }
- if (queueFlowControlResumeSize > queueFlowControlSize)
- {
- throw new IllegalConfigurationException("Flow resume size can't be greater than flow control size");
- }
+ throw new IllegalConfigurationException("Flow resume size can't be greater than flow control size");
}
- for (Map.Entry<String, Object> entry: convertedAttributes.entrySet())
+
+
+ for (String attrName : NON_NEGATIVE_NUMBERS)
{
- Object value = entry.getValue();
- if (value instanceof Number && ((Number)value).longValue() < 0)
+ if (changedAttributes.contains(attrName))
{
- throw new IllegalConfigurationException("Only positive integer value can be specified for the attribute "
- + entry.getKey());
+ Object value = queue.getAttribute(attrName);
+ if (!(value instanceof Number) || ((Number) value).longValue() < 0)
+ {
+ throw new IllegalConfigurationException(
+ "Only positive integer value can be specified for the attribute "
+ + attrName);
+ }
}
}
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueue.java
index c97a789f65..a9c1fec246 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueue.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueue.java
@@ -21,6 +21,7 @@
package org.apache.qpid.server.queue;
import org.apache.qpid.server.model.ManagedAttribute;
+import org.apache.qpid.server.model.ManagedContextDefault;
import org.apache.qpid.server.model.ManagedObject;
@ManagedObject( category = false, type="lvq" )
@@ -28,6 +29,9 @@ public interface LastValueQueue<X extends LastValueQueue<X>> extends AMQQueue<X>
{
String LVQ_KEY = "lvqKey";
- @ManagedAttribute
+ @ManagedContextDefault( name = "queue.lvqKey" )
+ String DEFAULT_LVQ_KEY = "qpid.LVQ_key";
+
+ @ManagedAttribute(automate = true, defaultValue = "${queue.lvqKey}")
String getLvqKey();
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueueFactory.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueueFactory.java
new file mode 100644
index 0000000000..2e34d0fc3e
--- /dev/null
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueueFactory.java
@@ -0,0 +1,48 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import java.util.Map;
+
+import org.apache.qpid.server.model.AbstractConfiguredObjectTypeFactory;
+import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.server.virtualhost.VirtualHostImpl;
+
+public class LastValueQueueFactory extends AbstractConfiguredObjectTypeFactory<LastValueQueueImpl>
+{
+ public LastValueQueueFactory()
+ {
+ super(LastValueQueueImpl.class);
+ }
+
+ @Override
+ protected LastValueQueueImpl createInstance(final Map<String, Object> attributes, final ConfiguredObject<?>... parents)
+ {
+ VirtualHost<?,?,?> virtualHost = getParent(VirtualHost.class, parents);
+ if (!(virtualHost instanceof VirtualHostImpl))
+ {
+ throw new IllegalArgumentException("Unexpected virtual host is set as a parent. Expected instance of " + VirtualHostImpl.class.getName());
+ }
+
+ return new LastValueQueueImpl((VirtualHostImpl<?,?,?>)virtualHost,attributes);
+ }
+}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueueImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueueImpl.java
index 079f04f92f..d888637d80 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueueImpl.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueueImpl.java
@@ -23,52 +23,39 @@ package org.apache.qpid.server.queue;
import java.util.Map;
-import org.apache.qpid.server.util.MapValueConverter;
+import org.apache.qpid.server.model.ManagedAttributeField;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
public class LastValueQueueImpl extends AbstractQueue<LastValueQueueImpl> implements LastValueQueue<LastValueQueueImpl>
{
- public static final String DEFAULT_LVQ_KEY = "qpid.LVQ_key";
+ private LastValueQueueList _entries;
+
+ @ManagedAttributeField
+ private String _lvqKey;
public LastValueQueueImpl(VirtualHostImpl virtualHost,
Map<String, Object> attributes)
{
- super(virtualHost, attributes, entryList(attributes));
+ super(virtualHost, attributes);
}
- private static LastValueQueueList.Factory entryList(final Map<String, Object> attributes)
- {
-
- String conflationKey = MapValueConverter.getStringAttribute(LVQ_KEY,
- attributes,
- DEFAULT_LVQ_KEY);
-
- // conflation key can still be null if it was present in the map with a null value
- return new LastValueQueueList.Factory(conflationKey == null ? DEFAULT_LVQ_KEY : conflationKey);
- }
-
- public String getConflationKey()
+ @Override
+ protected void onOpen()
{
- return ((LastValueQueueList)getEntries()).getConflationKey();
+ super.onOpen();
+ _entries = new LastValueQueueList(this);
}
@Override
- public Object getAttribute(final String name)
+ LastValueQueueList getEntries()
{
- if(LVQ_KEY.equals(name))
- {
- if(this instanceof LastValueQueueImpl)
- {
- return getConflationKey();
- }
- }
- return super.getAttribute(name);
+ return _entries;
}
@Override
public String getLvqKey()
{
- return getConflationKey();
+ return _lvqKey;
}
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueueList.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueueList.java
index cd586e9629..6f1edf12e5 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueueList.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueueList.java
@@ -54,10 +54,10 @@ public class LastValueQueueList extends OrderedQueueEntryList
private final ConflationQueueEntry _deleteInProgress = new ConflationQueueEntry(this);
private final ConflationQueueEntry _newerEntryAlreadyBeenAndGone = new ConflationQueueEntry(this);
- public LastValueQueueList(LastValueQueueImpl queue, String conflationKey)
+ public LastValueQueueList(LastValueQueueImpl queue)
{
super(queue, HEAD_CREATOR);
- _conflationKey = conflationKey;
+ _conflationKey = queue.getLvqKey();
}
private ConflationQueueEntry createHead()
@@ -65,11 +65,6 @@ public class LastValueQueueList extends OrderedQueueEntryList
return new ConflationQueueEntry(this);
}
- public String getConflationKey()
- {
- return _conflationKey;
- }
-
@Override
protected ConflationQueueEntry createQueueEntry(ServerMessage message)
{
@@ -254,20 +249,4 @@ public class LastValueQueueList extends OrderedQueueEntryList
{
return Collections.unmodifiableMap(_latestValuesMap);
}
-
- static class Factory implements QueueEntryListFactory
- {
- private final String _conflationKey;
-
- Factory(String conflationKey)
- {
- _conflationKey = conflationKey;
- }
-
- @Override
- public LastValueQueueList createQueueEntryList(final AMQQueue<?> queue)
- {
- return new LastValueQueueList((LastValueQueueImpl)queue, _conflationKey);
- }
- }
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java
index 0797bbd4e9..39317f455d 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java
@@ -28,10 +28,9 @@ public abstract class OutOfOrderQueue<X extends OutOfOrderQueue<X>> extends Abst
{
protected OutOfOrderQueue(VirtualHostImpl virtualHost,
- Map<String, Object> attributes,
- QueueEntryListFactory entryListFactory)
+ Map<String, Object> attributes)
{
- super(virtualHost, attributes, entryListFactory);
+ super(virtualHost, attributes);
}
@Override
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueue.java
index 8e8732d595..87693b793d 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueue.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueue.java
@@ -21,6 +21,7 @@
package org.apache.qpid.server.queue;
import org.apache.qpid.server.model.ManagedAttribute;
+import org.apache.qpid.server.model.ManagedContextDefault;
import org.apache.qpid.server.model.ManagedObject;
@ManagedObject( category = false, type="priority" )
@@ -28,6 +29,9 @@ public interface PriorityQueue<X extends PriorityQueue<X>> extends AMQQueue<X>
{
String PRIORITIES = "priorities";
- @ManagedAttribute
+ @ManagedContextDefault( name = "queue.priorities")
+ int DEFAULT_PRIORITY_LEVELS = 10;
+
+ @ManagedAttribute( automate = true, defaultValue = "${queue.priorities}")
int getPriorities();
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueFactory.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueFactory.java
new file mode 100644
index 0000000000..f7a46d5bef
--- /dev/null
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueFactory.java
@@ -0,0 +1,48 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import java.util.Map;
+
+import org.apache.qpid.server.model.AbstractConfiguredObjectTypeFactory;
+import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.server.virtualhost.VirtualHostImpl;
+
+public class PriorityQueueFactory extends AbstractConfiguredObjectTypeFactory<PriorityQueueImpl>
+{
+ public PriorityQueueFactory()
+ {
+ super(PriorityQueueImpl.class);
+ }
+
+ @Override
+ protected PriorityQueueImpl createInstance(final Map<String, Object> attributes, final ConfiguredObject<?>... parents)
+ {
+ VirtualHost<?,?,?> virtualHost = getParent(VirtualHost.class, parents);
+ if (!(virtualHost instanceof VirtualHostImpl))
+ {
+ throw new IllegalArgumentException("Unexpected virtual host is set as a parent. Expected instance of " + VirtualHostImpl.class.getName());
+ }
+
+ return new PriorityQueueImpl((VirtualHostImpl<?,?,?>)virtualHost,attributes);
+ }
+}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueImpl.java
index 89d542aded..20759797d2 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueImpl.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueImpl.java
@@ -22,43 +22,55 @@ package org.apache.qpid.server.queue;
import java.util.Map;
-import org.apache.qpid.server.util.MapValueConverter;
+import org.apache.qpid.server.logging.LogMessage;
+import org.apache.qpid.server.logging.messages.QueueMessages;
+import org.apache.qpid.server.model.LifetimePolicy;
+import org.apache.qpid.server.model.ManagedAttributeField;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
public class PriorityQueueImpl extends OutOfOrderQueue<PriorityQueueImpl> implements PriorityQueue<PriorityQueueImpl>
{
- public static final int DEFAULT_PRIORITY_LEVELS = 10;
+ private PriorityQueueList _entries;
+
+ @ManagedAttributeField
+ private int _priorities;
public PriorityQueueImpl(VirtualHostImpl virtualHost,
- Map<String, Object> attributes)
+ Map<String, Object> attributes)
{
- super(virtualHost, attributes, entryList(attributes));
+ super(virtualHost, attributes);
}
- private static PriorityQueueList.Factory entryList(final Map<String, Object> attributes)
+ @Override
+ protected void onOpen()
{
- final Integer priorities = MapValueConverter.getIntegerAttribute(PRIORITIES, attributes,
- DEFAULT_PRIORITY_LEVELS);
-
- return new PriorityQueueList.Factory(priorities);
+ super.onOpen();
+ _entries = PriorityQueueList.newInstance(this);
}
@Override
public int getPriorities()
{
- return getEntries().getPriorities();
+ return _priorities;
}
@Override
- public Object getAttribute(final String name)
+ PriorityQueueList getEntries()
{
+ return _entries;
+ }
- if(PRIORITIES.equals(name))
- {
- return getPriorities();
- }
-
- return super.getAttribute(name);
+ protected LogMessage getCreatedLogMessage()
+ {
+ String ownerString = getOwner();
+ return QueueMessages.CREATED(ownerString,
+ getPriorities(),
+ ownerString != null,
+ getLifetimePolicy() != LifetimePolicy.PERMANENT,
+ isDurable(),
+ !isDurable(),
+ true);
}
+
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java
index f72c8cd57a..076b6c9e73 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java
@@ -26,6 +26,11 @@ abstract public class PriorityQueueList extends OrderedQueueEntryList
{
+ public static PriorityQueueList newInstance(PriorityQueueImpl queue)
+ {
+ return new PriorityQueueMasterList(queue, queue.getPriorities());
+ }
+
public PriorityQueueList(final PriorityQueueImpl queue,
final HeadCreator headCreator)
{
@@ -185,20 +190,6 @@ abstract public class PriorityQueueList extends OrderedQueueEntryList
}
}
- static class Factory implements QueueEntryListFactory
- {
- private final int _priorities;
-
- Factory(int priorities)
- {
- _priorities = priorities;
- }
-
- public PriorityQueueList createQueueEntryList(AMQQueue<?> queue)
- {
- return new PriorityQueueMasterList((PriorityQueueImpl) queue, _priorities);
- }
- }
static class PriorityQueueEntrySubList extends PriorityQueueList
{
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java
index c1687de86e..15df952e61 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java
@@ -24,6 +24,7 @@ import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.virtualhost.AbstractVirtualHost;
public class QueueArgumentsConverter
{
@@ -84,7 +85,7 @@ public class QueueArgumentsConverter
ATTRIBUTE_MAPPINGS.put(X_QPID_DESCRIPTION, Queue.DESCRIPTION);
- ATTRIBUTE_MAPPINGS.put(X_QPID_DLQ_ENABLED, Queue.CREATE_DLQ_ON_CREATION);
+ ATTRIBUTE_MAPPINGS.put(X_QPID_DLQ_ENABLED, AbstractVirtualHost.CREATE_DLQ_ON_CREATION);
ATTRIBUTE_MAPPINGS.put(QPID_GROUP_HEADER_KEY, Queue.MESSAGE_GROUP_KEY);
//ATTRIBUTE_MAPPINGS.put(QPID_SHARED_MSG_GROUP, Queue.MESSAGE_GROUP_SHARED_GROUPS);
ATTRIBUTE_MAPPINGS.put(QPID_DEFAULT_MESSAGE_GROUP_ARG, Queue.MESSAGE_GROUP_DEFAULT_GROUP);
@@ -108,7 +109,7 @@ public class QueueArgumentsConverter
}
if(wireArguments.containsKey(QPID_LAST_VALUE_QUEUE) && !wireArguments.containsKey(QPID_LAST_VALUE_QUEUE_KEY))
{
- modelArguments.put(LastValueQueue.LVQ_KEY, LastValueQueueImpl.DEFAULT_LVQ_KEY);
+ modelArguments.put(LastValueQueue.LVQ_KEY, LastValueQueue.DEFAULT_LVQ_KEY);
}
if(wireArguments.containsKey(QPID_SHARED_MSG_GROUP))
{
@@ -117,7 +118,7 @@ public class QueueArgumentsConverter
}
if(wireArguments.get(X_QPID_DLQ_ENABLED) != null)
{
- modelArguments.put(Queue.CREATE_DLQ_ON_CREATION, Boolean.parseBoolean(wireArguments.get(X_QPID_DLQ_ENABLED).toString()));
+ modelArguments.put(AbstractVirtualHost.CREATE_DLQ_ON_CREATION, Boolean.parseBoolean(wireArguments.get(X_QPID_DLQ_ENABLED).toString()));
}
if(wireArguments.get(QPID_NO_LOCAL) != null)
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
index b62e100eea..6fa7801608 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
@@ -20,29 +20,29 @@
*/
package org.apache.qpid.server.queue;
+import java.util.EnumMap;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import java.util.concurrent.atomic.AtomicLongFieldUpdater;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+
import org.apache.log4j.Logger;
import org.apache.qpid.server.consumer.ConsumerImpl;
-import org.apache.qpid.server.exchange.ExchangeImpl;
import org.apache.qpid.server.filter.Filterable;
import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.model.Exchange;
import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.txn.LocalTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.util.StateChangeListener;
-import java.util.EnumMap;
-import java.util.HashSet;
-import java.util.Set;
-import java.util.concurrent.CopyOnWriteArraySet;
-import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
-import java.util.concurrent.atomic.AtomicLongFieldUpdater;
-import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
-
public abstract class QueueEntryImpl implements QueueEntry
{
private static final Logger _log = Logger.getLogger(QueueEntryImpl.class);
@@ -365,7 +365,7 @@ public abstract class QueueEntryImpl implements QueueEntry
public int routeToAlternate(final Action<? super MessageInstance> action, ServerTransaction txn)
{
final AMQQueue currentQueue = getQueue();
- ExchangeImpl alternateExchange = currentQueue.getAlternateExchange();
+ Exchange<?> alternateExchange = currentQueue.getAlternateExchange();
boolean autocommit = txn == null;
int enqueues;
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryListFactory.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryListFactory.java
deleted file mode 100644
index 3f701f652e..0000000000
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryListFactory.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
-*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied. See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*
-*/
-package org.apache.qpid.server.queue;
-
-interface QueueEntryListFactory
-{
- public QueueEntryList createQueueEntryList(AMQQueue<?> queue);
-}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueFactory.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueFactory.java
new file mode 100644
index 0000000000..e56ff20c67
--- /dev/null
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueFactory.java
@@ -0,0 +1,102 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import java.util.Map;
+
+import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.model.ConfiguredObjectFactory;
+import org.apache.qpid.server.model.ConfiguredObjectFactoryImpl;
+import org.apache.qpid.server.model.Model;
+import org.apache.qpid.server.model.Port;
+import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.plugin.ConfiguredObjectTypeFactory;
+import org.apache.qpid.server.store.ConfiguredObjectRecord;
+import org.apache.qpid.server.store.UnresolvedConfiguredObject;
+
+public class QueueFactory<X extends Queue<X>> implements ConfiguredObjectTypeFactory<X>
+{
+ private ConfiguredObjectFactory _configuredObjectFactory;
+
+ @Override
+ public Class<? super X> getCategoryClass()
+ {
+ return Queue.class;
+ }
+
+ @Override
+ public X create(final Map<String, Object> attributes, final ConfiguredObject<?>... parents)
+ {
+ return getQueueFactory(attributes).create(attributes, parents);
+ }
+
+ @Override
+ public UnresolvedConfiguredObject<X> recover(final ConfiguredObjectRecord record,
+ final ConfiguredObject<?>... parents)
+ {
+ return getQueueFactory(record.getAttributes()).recover(record, parents);
+ }
+
+ private ConfiguredObjectTypeFactory<X> getQueueFactory(Map<String, Object> attributes)
+ {
+
+ String type;
+
+ if(attributes.containsKey(Port.TYPE))
+ {
+ type = (String) attributes.get(Port.TYPE);
+ }
+ else
+ {
+ if(attributes.containsKey(PriorityQueue.PRIORITIES))
+ {
+ type = "priority";
+ }
+ else if(attributes.containsKey(SortedQueue.SORT_KEY))
+ {
+ type = "sorted";
+ }
+ else if(attributes.containsKey(LastValueQueue.LVQ_KEY))
+ {
+ type = "lvq";
+ }
+ else
+ {
+ type = "standard";
+ }
+ }
+
+ synchronized (this)
+ {
+ if(_configuredObjectFactory == null)
+ {
+ _configuredObjectFactory = new ConfiguredObjectFactoryImpl(Model.getInstance());
+ }
+ }
+ return _configuredObjectFactory.getConfiguredObjectTypeFactory(Queue.class.getSimpleName(), type);
+ }
+
+ @Override
+ public String getType()
+ {
+ return null;
+ }
+}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java
index 19f9f6c427..a91e1e8d58 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java
@@ -28,7 +28,7 @@ public interface SortedQueue<X extends SortedQueue<X>> extends AMQQueue<X>
{
String SORT_KEY = "sortKey";
- @ManagedAttribute
+ @ManagedAttribute( automate = true, mandatory = true )
String getSortKey();
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java
index 92abe30442..efb5438214 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java
@@ -39,11 +39,11 @@ public class SortedQueueEntryList implements QueueEntryList
private final SortedQueueImpl _queue;
private final String _propertyName;
- public SortedQueueEntryList(final SortedQueueImpl queue, final String propertyName)
+ public SortedQueueEntryList(final SortedQueueImpl queue)
{
_queue = queue;
_head = new SortedQueueEntry(this);
- _propertyName = propertyName;
+ _propertyName = queue.getSortKey();
}
public SortedQueueImpl getQueue()
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryListFactory.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryListFactory.java
deleted file mode 100644
index 5cab126374..0000000000
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryListFactory.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.queue;
-
-public class SortedQueueEntryListFactory implements QueueEntryListFactory
-{
-
- private final String _propertyName;
-
- public SortedQueueEntryListFactory(final String propertyName)
- {
- _propertyName = propertyName;
- }
-
- @Override
- public SortedQueueEntryList createQueueEntryList(final AMQQueue<?> queue)
- {
- return new SortedQueueEntryList((SortedQueueImpl) queue, _propertyName);
- }
-}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueFactory.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueFactory.java
new file mode 100644
index 0000000000..484e61efa9
--- /dev/null
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueFactory.java
@@ -0,0 +1,48 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import java.util.Map;
+
+import org.apache.qpid.server.model.AbstractConfiguredObjectTypeFactory;
+import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.server.virtualhost.VirtualHostImpl;
+
+public class SortedQueueFactory extends AbstractConfiguredObjectTypeFactory<SortedQueueImpl>
+{
+ public SortedQueueFactory()
+ {
+ super(SortedQueueImpl.class);
+ }
+
+ @Override
+ protected SortedQueueImpl createInstance(final Map<String, Object> attributes, final ConfiguredObject<?>... parents)
+ {
+ VirtualHost<?,?,?> virtualHost = getParent(VirtualHost.class, parents);
+ if (!(virtualHost instanceof VirtualHostImpl))
+ {
+ throw new IllegalArgumentException("Unexpected virtual host is set as a parent. Expected instance of " + VirtualHostImpl.class.getName());
+ }
+
+ return new SortedQueueImpl((VirtualHostImpl<?,?,?>)virtualHost,attributes);
+ }
+}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueImpl.java
index c495b09d4a..d0bb79900d 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueImpl.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueImpl.java
@@ -23,8 +23,8 @@ import java.util.Map;
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.model.ManagedAttributeField;
import org.apache.qpid.server.util.Action;
-import org.apache.qpid.server.util.MapValueConverter;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
public class SortedQueueImpl extends OutOfOrderQueue<SortedQueueImpl> implements SortedQueue<SortedQueueImpl>
@@ -33,30 +33,22 @@ public class SortedQueueImpl extends OutOfOrderQueue<SortedQueueImpl> implements
//monitor to prevent lock order issues with consumer sendLocks
//and consumer updates in the super classes
private final Object _sortedQueueLock = new Object();
- private final String _sortedPropertyName;
-
- protected SortedQueueImpl(VirtualHostImpl virtualHost,
- Map<String, Object> attributes,
- QueueEntryListFactory factory)
- {
- super(virtualHost, attributes, factory);
- _sortedPropertyName = MapValueConverter.getStringAttribute(SORT_KEY,attributes);
- }
+ @ManagedAttributeField
+ private String _sortKey;
+ private SortedQueueEntryList _entries;
public SortedQueueImpl(VirtualHostImpl virtualHost,
- Map<String, Object> attributes)
+ Map<String, Object> attributes)
{
- this(virtualHost,
- attributes,
- new SortedQueueEntryListFactory(MapValueConverter.getStringAttribute(SORT_KEY, attributes)));
+ super(virtualHost, attributes);
}
-
-
- public String getSortedPropertyName()
+ @Override
+ protected void onOpen()
{
- return _sortedPropertyName;
+ super.onOpen();
+ _entries = new SortedQueueEntryList(this);
}
@Override
@@ -70,20 +62,14 @@ public class SortedQueueImpl extends OutOfOrderQueue<SortedQueueImpl> implements
}
@Override
- public Object getAttribute(final String name)
+ SortedQueueEntryList getEntries()
{
-
- if(SORT_KEY.equals(name))
- {
- return getSortedPropertyName();
- }
-
- return super.getAttribute(name);
+ return _entries;
}
@Override
public String getSortKey()
{
- return getSortedPropertyName();
+ return _sortKey;
}
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueueEntryList.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueueEntryList.java
index 341e2365c3..487bac1a43 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueueEntryList.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueueEntryList.java
@@ -45,13 +45,4 @@ public class StandardQueueEntryList extends OrderedQueueEntryList
return new StandardQueueEntry(this, message);
}
- static class Factory implements QueueEntryListFactory
- {
-
- public StandardQueueEntryList createQueueEntryList(AMQQueue<?> queue)
- {
- return new StandardQueueEntryList((StandardQueueImpl) queue);
- }
- }
-
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueueFactory.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueueFactory.java
new file mode 100644
index 0000000000..1e2d8f63a6
--- /dev/null
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueueFactory.java
@@ -0,0 +1,48 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import java.util.Map;
+
+import org.apache.qpid.server.model.AbstractConfiguredObjectTypeFactory;
+import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.server.virtualhost.VirtualHostImpl;
+
+public class StandardQueueFactory extends AbstractConfiguredObjectTypeFactory<StandardQueueImpl>
+{
+ public StandardQueueFactory()
+ {
+ super(StandardQueueImpl.class);
+ }
+
+ @Override
+ protected StandardQueueImpl createInstance(final Map<String, Object> attributes, final ConfiguredObject<?>... parents)
+ {
+ VirtualHost<?,?,?> virtualHost = getParent(VirtualHost.class, parents);
+ if (!(virtualHost instanceof VirtualHostImpl))
+ {
+ throw new IllegalArgumentException("Unexpected virtual host is set as a parent. Expected instance of " + VirtualHostImpl.class.getName());
+ }
+
+ return new StandardQueueImpl((VirtualHostImpl<?,?,?>)virtualHost,attributes);
+ }
+}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueueImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueueImpl.java
index 4dc57b14a7..19c808a2f2 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueueImpl.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueueImpl.java
@@ -26,9 +26,24 @@ import org.apache.qpid.server.virtualhost.VirtualHostImpl;
public class StandardQueueImpl extends AbstractQueue<StandardQueueImpl> implements StandardQueue<StandardQueueImpl>
{
+ private StandardQueueEntryList _entries;
+
public StandardQueueImpl(final VirtualHostImpl virtualHost,
final Map<String, Object> arguments)
{
- super(virtualHost, arguments, new StandardQueueEntryList.Factory());
+ super(virtualHost, arguments);
+ }
+
+ @Override
+ protected void onOpen()
+ {
+ super.onOpen();
+ _entries = new StandardQueueEntryList(this);
+ }
+
+ @Override
+ StandardQueueEntryList getEntries()
+ {
+ return _entries;
}
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java
index cb3729e4e3..a532e2a749 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java
@@ -31,6 +31,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import javax.security.auth.login.AccountNotFoundException;
@@ -48,7 +49,6 @@ import org.apache.qpid.server.model.IllegalStateTransitionException;
import org.apache.qpid.server.model.ManagedAttributeField;
import org.apache.qpid.server.model.PreferencesProvider;
import org.apache.qpid.server.model.State;
-import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.model.User;
import org.apache.qpid.server.security.access.Operation;
import org.apache.qpid.server.security.auth.AuthenticationResult;
@@ -531,7 +531,7 @@ public abstract class PrincipalDatabaseAuthenticationManager<T extends Principal
private static Map<String, Object> createPrincipalAttributes(PrincipalDatabaseAuthenticationManager manager, final Principal user)
{
final Map<String, Object> attributes = new HashMap<String, Object>();
- attributes.put(ID, UUIDGenerator.generateUserUUID(manager.getName(), user.getName()));
+ attributes.put(ID, UUID.randomUUID());
attributes.put(NAME, user.getName());
return attributes;
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractDurableConfiguredObjectRecoverer.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractDurableConfiguredObjectRecoverer.java
index 154b548cab..75fe2a6642 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractDurableConfiguredObjectRecoverer.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractDurableConfiguredObjectRecoverer.java
@@ -22,44 +22,84 @@ package org.apache.qpid.server.store;
import java.util.UUID;
-public abstract class AbstractDurableConfiguredObjectRecoverer<T> implements DurableConfiguredObjectRecoverer
+import org.apache.qpid.server.model.ConfiguredObject;
+
+public abstract class AbstractDurableConfiguredObjectRecoverer<T extends ConfiguredObject> implements DurableConfiguredObjectRecoverer
{
@Override
public void load(final DurableConfigurationRecoverer durableConfigurationRecoverer,
final ConfiguredObjectRecord record)
{
- final UnresolvedObject obj = createUnresolvedObject(record);
+ final UnresolvedObject<T> obj = createUnresolvedObject(record);
UnresolvedDependency[] dependencies = obj.getUnresolvedDependencies();
for(final UnresolvedDependency dependency : dependencies)
{
- Object dep;
- if((dep = durableConfigurationRecoverer.getResolvedObject(dependency.getType(), dependency.getId())) != null)
+ if(dependency.getId() != null)
{
- dependency.resolve(dep);
+ Object dep;
+ if ((dep = durableConfigurationRecoverer.getResolvedObject(dependency.getType(), dependency.getId()))
+ != null)
+ {
+ dependency.resolve(dep);
+ }
+ else
+ {
+ durableConfigurationRecoverer.addResolutionListener(dependency.getType(), dependency.getId(),
+ null, new DependencyListener()
+ {
+
+ @Override
+ public void dependencyResolved(final String depType,
+ final UUID depId,
+ final ConfiguredObject o)
+ {
+ dependency.resolve(o);
+ if (obj.getUnresolvedDependencies().length
+ == 0)
+ {
+ durableConfigurationRecoverer.resolve(
+ getType(),
+ record.getId(),
+ obj.resolve());
+ }
+ }
+ }
+ );
+ }
}
else
{
- durableConfigurationRecoverer.addResolutionListener(dependency.getType(), dependency.getId(),
- new DependencyListener()
- {
+ Object dep;
+
+ if ((dep = durableConfigurationRecoverer.getResolvedObject(dependency.getType(), dependency.getName()))
+ != null)
+ {
+ dependency.resolve(dep);
+ }
+ else
+ {
+ durableConfigurationRecoverer.addResolutionListener(dependency.getType(), dependency.getId(),
+ dependency.getName(), new DependencyListener()
+ {
- @Override
- public void dependencyResolved(final String depType,
- final UUID depId,
- final Object o)
- {
- dependency.resolve(o);
- if (obj.getUnresolvedDependencies().length
- == 0)
- {
- durableConfigurationRecoverer.resolve(
- getType(),
- record.getId(),
- obj.resolve());
- }
- }
- }
- );
+ @Override
+ public void dependencyResolved(final String depType,
+ final UUID depId,
+ final ConfiguredObject o)
+ {
+ dependency.resolve(o);
+ if (obj.getUnresolvedDependencies().length
+ == 0)
+ {
+ durableConfigurationRecoverer.resolve(
+ getType(),
+ record.getId(),
+ obj.resolve());
+ }
+ }
+ }
+ );
+ }
}
}
if(obj.getUnresolvedDependencies().length == 0)
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DependencyListener.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DependencyListener.java
index 120c904cf7..2962e63152 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DependencyListener.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DependencyListener.java
@@ -22,7 +22,9 @@ package org.apache.qpid.server.store;
import java.util.UUID;
+import org.apache.qpid.server.model.ConfiguredObject;
+
interface DependencyListener
{
- void dependencyResolved(String type, UUID id, Object o);
+ void dependencyResolved(String type, UUID id, ConfiguredObject o);
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationRecoverer.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationRecoverer.java
index f8a8741a37..507e14f021 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationRecoverer.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationRecoverer.java
@@ -29,17 +29,19 @@ import java.util.Map;
import java.util.UUID;
import org.apache.log4j.Logger;
+
import org.apache.qpid.server.configuration.IllegalConfigurationException;
import org.apache.qpid.server.logging.EventLogger;
import org.apache.qpid.server.logging.messages.ConfigStoreMessages;
import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject;
+import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.Model;
public class DurableConfigurationRecoverer implements ConfigurationRecoveryHandler
{
private static final Logger _logger = Logger.getLogger(DurableConfigurationRecoverer.class);
- private final Map<String, Map<UUID, Object>> _resolvedObjects = new HashMap<String, Map<UUID, Object>>();
+ private final Map<String, Map<UUID, ConfiguredObject>> _resolvedObjects = new HashMap<String, Map<UUID, ConfiguredObject>>();
private final Map<String, Map<UUID, UnresolvedObject>> _unresolvedObjects =
new HashMap<String, Map<UUID, UnresolvedObject>>();
@@ -47,6 +49,9 @@ public class DurableConfigurationRecoverer implements ConfigurationRecoveryHandl
private final Map<String, Map<UUID, List<DependencyListener>>> _dependencyListeners =
new HashMap<String, Map<UUID, List<DependencyListener>>>();
+ private final Map<String, Map<String, List<DependencyListener>>> _dependencyNameListeners =
+ new HashMap<String, Map<String, List<DependencyListener>>>();
+
private final Map<String, DurableConfiguredObjectRecoverer> _recoverers;
private final UpgraderProvider _upgraderProvider;
private final EventLogger _eventLogger;
@@ -188,36 +193,72 @@ public class DurableConfigurationRecoverer implements ConfigurationRecoveryHandl
void addResolutionListener(final String type,
final UUID id,
+ final String name,
final DependencyListener dependencyListener)
{
- Map<UUID, List<DependencyListener>> typeListeners = _dependencyListeners.get(type);
- if(typeListeners == null)
+ if(id != null)
{
- typeListeners = new HashMap<UUID, List<DependencyListener>>();
- _dependencyListeners.put(type, typeListeners);
+ Map<UUID, List<DependencyListener>> typeListeners = _dependencyListeners.get(type);
+ if (typeListeners == null)
+ {
+ typeListeners = new HashMap<UUID, List<DependencyListener>>();
+ _dependencyListeners.put(type, typeListeners);
+ }
+ List<DependencyListener> objectListeners = typeListeners.get(id);
+ if (objectListeners == null)
+ {
+ objectListeners = new ArrayList<DependencyListener>();
+ typeListeners.put(id, objectListeners);
+ }
+ objectListeners.add(dependencyListener);
}
- List<DependencyListener> objectListeners = typeListeners.get(id);
- if(objectListeners == null)
+ else
{
- objectListeners = new ArrayList<DependencyListener>();
- typeListeners.put(id, objectListeners);
+ Map<String, List<DependencyListener>> typeListeners = _dependencyNameListeners.get(type);
+ if (typeListeners == null)
+ {
+ typeListeners = new HashMap<String, List<DependencyListener>>();
+ _dependencyNameListeners.put(type, typeListeners);
+ }
+ List<DependencyListener> objectListeners = typeListeners.get(name);
+ if (objectListeners == null)
+ {
+ objectListeners = new ArrayList<DependencyListener>();
+ typeListeners.put(name, objectListeners);
+ }
+ objectListeners.add(dependencyListener);
}
- objectListeners.add(dependencyListener);
-
}
Object getResolvedObject(final String type, final UUID id)
{
- Map<UUID, Object> objects = _resolvedObjects.get(type);
+ Map<UUID, ConfiguredObject> objects = _resolvedObjects.get(type);
return objects == null ? null : objects.get(id);
}
- void resolve(final String type, final UUID id, final Object object)
+ Object getResolvedObject(final String type, final String name)
+ {
+ Map<UUID, ConfiguredObject> objects = _resolvedObjects.get(type);
+ if(objects != null)
+ {
+ for (ConfiguredObject object : objects.values())
+ {
+ if(object.getName().equals(name))
+ {
+ return object;
+ }
+ }
+ }
+ return null;
+
+ }
+
+ void resolve(final String type, final UUID id, final ConfiguredObject object)
{
- Map<UUID, Object> typeObjects = _resolvedObjects.get(type);
+ Map<UUID, ConfiguredObject> typeObjects = _resolvedObjects.get(type);
if(typeObjects == null)
{
- typeObjects = new HashMap<UUID, Object>();
+ typeObjects = new HashMap<UUID, ConfiguredObject>();
_resolvedObjects.put(type, typeObjects);
}
typeObjects.put(id, object);
@@ -239,6 +280,19 @@ public class DurableConfigurationRecoverer implements ConfigurationRecoveryHandl
}
}
}
+
+ Map<String, List<DependencyListener>> typeNameListeners = _dependencyNameListeners.get(type);
+ if(typeNameListeners != null)
+ {
+ List<DependencyListener> listeners = typeNameListeners.remove(object.getName());
+ if(listeners != null)
+ {
+ for(DependencyListener listener : listeners)
+ {
+ listener.dependencyResolved(type, id, object);
+ }
+ }
+ }
}
void addUnresolvedObject(final String type,
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStoreHelper.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStoreHelper.java
index 9410006d65..e7f380660c 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStoreHelper.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStoreHelper.java
@@ -22,14 +22,11 @@ package org.apache.qpid.server.store;
import java.util.Arrays;
import java.util.HashSet;
-import java.util.Map;
-
import java.util.Set;
import org.apache.qpid.server.binding.BindingImpl;
import org.apache.qpid.server.exchange.ExchangeImpl;
import org.apache.qpid.server.model.Binding;
-import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.Exchange;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.queue.AMQQueue;
@@ -44,26 +41,12 @@ public class DurableConfigurationStoreHelper
public static void updateQueue(DurableConfigurationStore store, AMQQueue queue)
{
- Map<String, Object> attributesMap = queue.getActualAttributes();
- attributesMap.remove(ConfiguredObject.ID);
- if(queue.getAlternateExchange() != null)
- {
- attributesMap.put(Queue.ALTERNATE_EXCHANGE, queue.getAlternateExchange().getId());
- }
- store.update(false, new ConfiguredObjectRecordImpl(queue.getId(), QUEUE, attributesMap));
+ store.update(false, queue.asObjectRecord());
}
public static void createQueue(DurableConfigurationStore store, AMQQueue<?> queue)
{
-
- Map<String, Object> attributesMap = queue.getActualAttributes();
- attributesMap.remove(ConfiguredObject.ID);
- if(queue.getAlternateExchange() != null)
- {
- attributesMap.put(Queue.ALTERNATE_EXCHANGE, queue.getAlternateExchange().getId());
- }
-
- store.create(new ConfiguredObjectRecordImpl(queue.getId(), QUEUE, attributesMap));
+ store.create(queue.asObjectRecord());
}
public static void removeQueue(DurableConfigurationStore store, AMQQueue queue)
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/UnresolvedDependency.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/UnresolvedDependency.java
index 98348efbd2..29991054fd 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/UnresolvedDependency.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/UnresolvedDependency.java
@@ -25,6 +25,7 @@ import java.util.UUID;
public interface UnresolvedDependency<T>
{
public UUID getId();
+ public String getName();
public String getType();
public void resolve(final T dependency);
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
index 0484841aa9..b770fab698 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
@@ -70,13 +70,6 @@ import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.protocol.LinkRegistry;
import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.LastValueQueue;
-import org.apache.qpid.server.queue.LastValueQueueImpl;
-import org.apache.qpid.server.queue.PriorityQueue;
-import org.apache.qpid.server.queue.PriorityQueueImpl;
-import org.apache.qpid.server.queue.SortedQueue;
-import org.apache.qpid.server.queue.SortedQueueImpl;
-import org.apache.qpid.server.queue.StandardQueueImpl;
import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.security.access.Operation;
import org.apache.qpid.server.stats.StatisticsCounter;
@@ -101,6 +94,7 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
{
public static final String DEFAULT_DLQ_NAME_SUFFIX = "_DLQ";
public static final String DLQ_ROUTING_KEY = "dlq";
+ public static final String CREATE_DLQ_ON_CREATION = "x-qpid-dlq-enabled"; // TODO - this value should change
private static final int MAX_LENGTH = 255;
private static final Logger _logger = Logger.getLogger(AbstractVirtualHost.class);
@@ -195,7 +189,7 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
if(attributes.get(ID) == null)
{
attributes = new HashMap<String, Object>(attributes);
- attributes.put(ID, UUIDGenerator.generateVhostUUID((String)attributes.get(NAME)));
+ attributes.put(ID, UUID.randomUUID());
}
return attributes;
}
@@ -608,6 +602,7 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
public AMQQueue<?> createQueue(Map<String, Object> attributes) throws QueueExistsException
{
checkVHostStateIsActive();
+
AMQQueue<?> queue = addQueue(attributes);
childAdded(queue);
return queue;
@@ -615,51 +610,34 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
private AMQQueue<?> addQueue(Map<String, Object> attributes) throws QueueExistsException
{
-
- // make a copy as we may augment (with an ID for example)
- attributes = new LinkedHashMap<String, Object>(attributes);
- if (attributes.containsKey(Queue.TYPE))
+ if (shouldCreateDLQ(attributes))
{
- String typeAttribute = MapValueConverter.getStringAttribute(Queue.TYPE, attributes, null);
- QueueType queueType = null;
- try
- {
- queueType = QueueType.valueOf(typeAttribute.toUpperCase());
- }
- catch(Exception e)
- {
- throw new IllegalArgumentException("Unsupported queue type :" + typeAttribute);
- }
- if (queueType == QueueType.LVQ && attributes.get(LastValueQueue.LVQ_KEY) == null)
- {
- attributes.put(LastValueQueue.LVQ_KEY, LastValueQueueImpl.DEFAULT_LVQ_KEY);
- }
- else if (queueType == QueueType.PRIORITY && attributes.get(PriorityQueue.PRIORITIES) == null)
- {
- attributes.put(PriorityQueue.PRIORITIES, 10);
- }
- else if (queueType == QueueType.SORTED && attributes.get(SortedQueue.SORT_KEY) == null)
- {
- throw new IllegalArgumentException("Sort key is not specified for sorted queue");
- }
+ // TODO - this isn't really correct - what if the name has ${foo} in it?
+ String queueName = String.valueOf(attributes.get(Queue.NAME));
+ validateDLNames(queueName);
+ String altExchangeName = createDLQ(queueName);
+ attributes = new LinkedHashMap<String, Object>(attributes);
+ attributes.put(Queue.ALTERNATE_EXCHANGE, altExchangeName);
}
+ return addQueueWithoutDLQ(attributes);
+ }
+
+ private AMQQueue<?> addQueueWithoutDLQ(Map<String, Object> attributes) throws QueueExistsException
+ {
+ Broker<?> broker = getParent(Broker.class);
- if(!attributes.containsKey(Queue.ID))
+ ConfiguredObjectTypeFactory<? extends Queue> factory =
+ broker.getObjectFactory().getConfiguredObjectTypeFactory(Queue.class, attributes);
+
+ try
{
- UUID id = UUID.randomUUID();
- attributes.put(Queue.ID, id);
+ return (AMQQueue) factory.create(attributes, this);
}
-
- boolean createDLQ = shouldCreateDLQ(attributes, getDefaultDeadLetterQueueEnabled());
- if (createDLQ)
+ catch (DuplicateNameException e)
{
- // TODO - this isn't really correct - what if the name has ${foo} in it?
- validateDLNames(String.valueOf(attributes.get(Queue.NAME)));
+ throw new QueueExistsException(getQueue(e.getName()));
}
- return createOrRestoreQueue(attributes, true);
-
-
}
@@ -1525,94 +1503,13 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
}
- // TODO - remove
- public AMQQueue restoreQueue(Map<String, Object> attributes)
- {
- return createOrRestoreQueue(attributes, false);
-
- }
-
-
- private AMQQueue createOrRestoreQueue(Map<String, Object> attributes, boolean createInStore)
- {
- String queueName = MapValueConverter.getStringAttribute(Queue.NAME,attributes);
- boolean createDLQ = createInStore && shouldCreateDLQ(attributes, getDefaultDeadLetterQueueEnabled());
- if (createDLQ)
- {
- validateDLNames(queueName);
- }
-
- AMQQueue queue;
-
- try
- {
-
-
- if (attributes.containsKey(SortedQueue.SORT_KEY))
- {
- queue = new SortedQueueImpl(this, attributes);
- }
- else if (attributes.containsKey(LastValueQueue.LVQ_KEY))
- {
- queue = new LastValueQueueImpl(this, attributes);
- }
- else if (attributes.containsKey(PriorityQueue.PRIORITIES))
- {
- queue = new PriorityQueueImpl(this, attributes);
- }
- else
- {
- queue = new StandardQueueImpl(this, attributes);
- }
- queue.open();
- }
- catch(DuplicateNameException e)
- {
-
- throw new QueueExistsException(e.getName(), getQueue(e.getName()));
- }
-
- if(createDLQ)
- {
- createDLQ(queue);
- }
- else if(attributes != null && attributes.get(Queue.ALTERNATE_EXCHANGE) instanceof String)
- {
-
- final String altExchangeAttr = (String) attributes.get(Queue.ALTERNATE_EXCHANGE);
- ExchangeImpl altExchange;
- try
- {
- altExchange = getExchange(UUID.fromString(altExchangeAttr));
- }
- catch(IllegalArgumentException e)
- {
- altExchange = getExchange(altExchangeAttr);
- }
- queue.setAlternateExchange(altExchange);
- }
-
- if (createInStore && queue.isDurable() && !(queue.getLifetimePolicy()
- == LifetimePolicy.DELETE_ON_CONNECTION_CLOSE
- || queue.getLifetimePolicy()
- == LifetimePolicy.DELETE_ON_SESSION_END))
- {
- DurableConfigurationStoreHelper.createQueue(getDurableConfigurationStore(), queue);
- }
-
- return queue;
- }
-
-
-
- private void createDLQ(final AMQQueue queue)
+ private String createDLQ(final String queueName)
{
- final String queueName = queue.getName();
final String dlExchangeName = getDeadLetterExchangeName(queueName);
final String dlQueueName = getDeadLetterQueueName(queueName);
ExchangeImpl dlExchange = null;
- final UUID dlExchangeId = UUIDGenerator.generateExchangeUUID(dlExchangeName, getName());
+ final UUID dlExchangeId = UUID.randomUUID();
try
{
@@ -1654,7 +1551,7 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
{
//set args to disable DLQ-ing/MDC from the DLQ itself, preventing loops etc
final Map<String, Object> args = new HashMap<String, Object>();
- args.put(Queue.CREATE_DLQ_ON_CREATION, false);
+ args.put(CREATE_DLQ_ON_CREATION, false);
args.put(Queue.MAXIMUM_DELIVERY_ATTEMPTS, 0);
try
@@ -1664,7 +1561,8 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
args.put(Queue.ID, UUID.randomUUID());
args.put(Queue.NAME, dlQueueName);
args.put(Queue.DURABLE, true);
- dlQueue = createQueue(args);
+ dlQueue = addQueueWithoutDLQ(args);
+ childAdded(dlQueue);
}
catch (QueueExistsException e)
{
@@ -1681,7 +1579,7 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
//but we will make the key 'dlq' as it can be logged at creation.
dlExchange.addBinding(AbstractVirtualHost.DLQ_ROUTING_KEY, dlQueue, null);
}
- queue.setAlternateExchange(dlExchange);
+ return dlExchangeName;
}
private static void validateDLNames(String name)
@@ -1701,8 +1599,9 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
}
}
- private static boolean shouldCreateDLQ(Map<String, Object> arguments, boolean virtualHostDefaultDeadLetterQueueEnabled)
+ private boolean shouldCreateDLQ(Map<String, Object> arguments)
{
+
boolean autoDelete = MapValueConverter.getEnumAttribute(LifetimePolicy.class,
Queue.LIFETIME_POLICY,
arguments,
@@ -1712,19 +1611,19 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
if (!(autoDelete || (arguments != null && arguments.containsKey(Queue.ALTERNATE_EXCHANGE))))
{
boolean dlqArgumentPresent = arguments != null
- && arguments.containsKey(Queue.CREATE_DLQ_ON_CREATION);
+ && arguments.containsKey(CREATE_DLQ_ON_CREATION);
if (dlqArgumentPresent)
{
boolean dlqEnabled = true;
if (dlqArgumentPresent)
{
- Object argument = arguments.get(Queue.CREATE_DLQ_ON_CREATION);
+ Object argument = arguments.get(CREATE_DLQ_ON_CREATION);
dlqEnabled = (argument instanceof Boolean && ((Boolean)argument).booleanValue())
|| (argument instanceof String && Boolean.parseBoolean(argument.toString()));
}
return dlqEnabled;
}
- return virtualHostDefaultDeadLetterQueueEnabled;
+ return isQueue_deadLetterQueueEnabled();
}
return false;
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/BindingRecoverer.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/BindingRecoverer.java
index a976db05f6..6e399d950e 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/BindingRecoverer.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/BindingRecoverer.java
@@ -123,6 +123,12 @@ public class BindingRecoverer extends AbstractDurableConfiguredObjectRecoverer<B
}
@Override
+ public String getName()
+ {
+ return null;
+ }
+
+ @Override
public String getType()
{
return Queue.class.getSimpleName();
@@ -147,6 +153,12 @@ public class BindingRecoverer extends AbstractDurableConfiguredObjectRecoverer<B
}
@Override
+ public String getName()
+ {
+ return null;
+ }
+
+ @Override
public String getType()
{
return org.apache.qpid.server.model.Exchange.class.getSimpleName();
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/ExchangeRecoverer.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/ExchangeRecoverer.java
index 4bf7635513..9347b97606 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/ExchangeRecoverer.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/ExchangeRecoverer.java
@@ -28,7 +28,6 @@ import org.apache.qpid.server.exchange.ExchangeImpl;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.ConfiguredObjectFactory;
import org.apache.qpid.server.model.Exchange;
-import org.apache.qpid.server.model.SystemContext;
import org.apache.qpid.server.plugin.ConfiguredObjectTypeFactory;
import org.apache.qpid.server.store.AbstractDurableConfiguredObjectRecoverer;
import org.apache.qpid.server.store.ConfiguredObjectRecord;
@@ -46,8 +45,7 @@ public class ExchangeRecoverer extends AbstractDurableConfiguredObjectRecoverer<
{
_vhost = vhost;
Broker<?> broker = _vhost.getParent(Broker.class);
- SystemContext<?> systemContext = broker.getParent(SystemContext.class);
- _objectFactory = systemContext.getObjectFactory();
+ _objectFactory = broker.getObjectFactory();
}
@Override
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueExistsException.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueExistsException.java
index 5bec8c1457..ef9046e54e 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueExistsException.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueExistsException.java
@@ -26,6 +26,11 @@ public class QueueExistsException extends RuntimeException
{
private final AMQQueue _existing;
+ public QueueExistsException(AMQQueue existing)
+ {
+ this(existing.getName(), existing);
+ }
+
public QueueExistsException(String name, AMQQueue existing)
{
super(name);
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueRecoverer.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueRecoverer.java
index 1841f1e45f..9606a44acb 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueRecoverer.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueRecoverer.java
@@ -21,7 +21,7 @@
package org.apache.qpid.server.virtualhost;
import java.util.ArrayList;
-import java.util.LinkedHashMap;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
@@ -29,21 +29,28 @@ import java.util.UUID;
import org.apache.log4j.Logger;
import org.apache.qpid.server.exchange.ExchangeImpl;
+import org.apache.qpid.server.model.Broker;
+import org.apache.qpid.server.model.ConfiguredObjectFactory;
import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.plugin.ConfiguredObjectTypeFactory;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.store.AbstractDurableConfiguredObjectRecoverer;
import org.apache.qpid.server.store.ConfiguredObjectRecord;
+import org.apache.qpid.server.store.UnresolvedConfiguredObject;
import org.apache.qpid.server.store.UnresolvedDependency;
import org.apache.qpid.server.store.UnresolvedObject;
public class QueueRecoverer extends AbstractDurableConfiguredObjectRecoverer<AMQQueue>
{
private static final Logger _logger = Logger.getLogger(QueueRecoverer.class);
- private final VirtualHostImpl _virtualHost;
+ private final VirtualHostImpl<?,?,?> _virtualHost;
+ private final ConfiguredObjectFactory _objectFactory;
public QueueRecoverer(final VirtualHostImpl virtualHost)
{
_virtualHost = virtualHost;
+ Broker<?> broker = _virtualHost.getParent(Broker.class);
+ _objectFactory = broker.getObjectFactory();
}
@Override
@@ -55,33 +62,42 @@ public class QueueRecoverer extends AbstractDurableConfiguredObjectRecoverer<AMQ
@Override
public UnresolvedObject<AMQQueue> createUnresolvedObject(final ConfiguredObjectRecord record)
{
- return new UnresolvedQueue(record.getId(), record.getAttributes());
+ return new UnresolvedQueue(record);
}
private class UnresolvedQueue implements UnresolvedObject<AMQQueue>
{
- private final Map<String, Object> _attributes;
- private final UUID _alternateExchangeId;
- private final UUID _id;
+
+ // private final UUID _alternateExchangeId;
+ private final ConfiguredObjectRecord _record;
private AMQQueue _queue;
private List<UnresolvedDependency> _dependencies = new ArrayList<UnresolvedDependency>();
private ExchangeImpl _alternateExchange;
+ private UUID _alternateExchangeId;
+ private String _alternateExchangeName;
- public UnresolvedQueue(final UUID id,
- final Map<String, Object> attributes)
+ public UnresolvedQueue(ConfiguredObjectRecord record)
{
- _attributes = attributes;
- _alternateExchangeId = _attributes.get(Queue.ALTERNATE_EXCHANGE) == null ? null : UUID.fromString((String) _attributes
- .get(Queue.ALTERNATE_EXCHANGE));
- _id = id;
- if (_alternateExchangeId != null)
+ _record = record;
+ Object altExchObj = record.getAttributes().get(Queue.ALTERNATE_EXCHANGE);
+ if(altExchObj instanceof UUID)
+ {
+ _alternateExchangeId = (UUID) altExchObj;
+ _dependencies.add(new AlternateExchangeDependency());
+ }
+ else if (altExchObj instanceof String)
{
- _alternateExchange = _virtualHost.getExchange(_alternateExchangeId);
- if(_alternateExchange == null)
+ try
+ {
+ _alternateExchangeId = UUID.fromString((String)altExchObj);
+ }
+ catch (IllegalArgumentException e)
{
- _dependencies.add(new AlternateExchangeDependency());
+ _alternateExchangeName = (String) altExchObj;
}
+ _dependencies.add(new AlternateExchangeDependency());
}
+
}
@Override
@@ -93,9 +109,9 @@ public class QueueRecoverer extends AbstractDurableConfiguredObjectRecoverer<AMQ
@Override
public AMQQueue resolve()
{
- String queueName = (String) _attributes.get(Queue.NAME);
+ String queueName = (String) _record.getAttributes().get(Queue.NAME);
- _queue = _virtualHost.getQueue(_id);
+ _queue = _virtualHost.getQueue(_record.getId());
if(_queue == null)
{
_queue = _virtualHost.getQueue(queueName);
@@ -103,11 +119,17 @@ public class QueueRecoverer extends AbstractDurableConfiguredObjectRecoverer<AMQ
if (_queue == null)
{
- Map<String, Object> attributes = new LinkedHashMap<String, Object>(_attributes);
- attributes.put(Queue.ID, _id);
- attributes.put(Queue.DURABLE, true);
- _queue = _virtualHost.restoreQueue(attributes);
+ Map<String,Object> attributesWithId = new HashMap<String,Object>(_record.getAttributes());
+ attributesWithId.put(Queue.ID,_record.getId());
+ attributesWithId.put(Queue.DURABLE,true);
+
+ ConfiguredObjectTypeFactory<? extends Queue> configuredObjectTypeFactory =
+ _objectFactory.getConfiguredObjectTypeFactory(Queue.class, attributesWithId);
+ UnresolvedConfiguredObject<? extends Queue> unresolvedConfiguredObject =
+ configuredObjectTypeFactory.recover(_record, _virtualHost);
+ _queue = (AMQQueue<?>) unresolvedConfiguredObject.resolve();
}
+ _queue.open();
return _queue;
}
@@ -120,6 +142,12 @@ public class QueueRecoverer extends AbstractDurableConfiguredObjectRecoverer<AMQ
}
@Override
+ public String getName()
+ {
+ return _alternateExchangeName;
+ }
+
+ @Override
public String getType()
{
return "Exchange";
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
index afb65fa326..d0737e8311 100755
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
@@ -128,6 +128,4 @@ public interface VirtualHostImpl< X extends VirtualHostImpl<X,Q,E>, Q extends AM
EventLogger getEventLogger();
- // TODO - remove
- public AMQQueue restoreQueue(Map<String, Object> attributes);
}
diff --git a/qpid/java/broker-core/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.ConfiguredObjectTypeFactory b/qpid/java/broker-core/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.ConfiguredObjectTypeFactory
index a5dfec8a29..cc000d49e6 100644
--- a/qpid/java/broker-core/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.ConfiguredObjectTypeFactory
+++ b/qpid/java/broker-core/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.ConfiguredObjectTypeFactory
@@ -35,6 +35,11 @@ org.apache.qpid.server.model.adapter.BrokerAdapterFactory
org.apache.qpid.server.model.adapter.StandardVirtualHostFactory
org.apache.qpid.server.model.adapter.FileBasedGroupProviderFactory
org.apache.qpid.server.model.adapter.FileSystemPreferencesProviderFactory
+org.apache.qpid.server.queue.LastValueQueueFactory
+org.apache.qpid.server.queue.PriorityQueueFactory
+org.apache.qpid.server.queue.QueueFactory
+org.apache.qpid.server.queue.SortedQueueFactory
+org.apache.qpid.server.queue.StandardQueueFactory
org.apache.qpid.server.exchange.DirectExchangeFactory
org.apache.qpid.server.exchange.FanoutExchangeFactory
org.apache.qpid.server.exchange.HeadersExchangeFactory
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
index c62b541191..5fd8de9ac9 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
@@ -709,8 +709,7 @@ abstract class AbstractQueueTestBase extends QpidTestCase
private AbstractQueue createNonAsyncDeliverQueue()
{
- TestSimpleQueueEntryListFactory factory = new TestSimpleQueueEntryListFactory();
- return new NonAsyncDeliverQueue(factory, getVirtualHost());
+ return new NonAsyncDeliverQueue(getVirtualHost());
}
/**
@@ -830,7 +829,8 @@ abstract class AbstractQueueTestBase extends QpidTestCase
QueueNotificationListener listener = mock(QueueNotificationListener .class);
_queue.setNotificationListener(listener);
- _queue.setAlertThresholdQueueDepthMessages(2);
+ _queue.setAttributes(Collections.<String, Object>singletonMap(Queue.ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES,
+ Integer.valueOf(2)));
_queue.enqueue(createMessage(new Long(24)), null);
verifyZeroInteractions(listener);
@@ -849,7 +849,8 @@ abstract class AbstractQueueTestBase extends QpidTestCase
_queue.enqueue(createMessage(new Long(26)), null);
_queue.setNotificationListener(listener);
- _queue.setAlertThresholdQueueDepthMessages(2);
+ _queue.setAttributes(Collections.<String, Object>singletonMap(Queue.ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES,
+ Integer.valueOf(2)));
verifyZeroInteractions(listener);
@@ -1046,17 +1047,6 @@ abstract class AbstractQueueTestBase extends QpidTestCase
return _consumerTarget;
}
-
- static class TestSimpleQueueEntryListFactory implements QueueEntryListFactory
- {
-
- @Override
- public NonAsyncDeliverList createQueueEntryList(final AMQQueue<?> queue)
- {
- return new NonAsyncDeliverList((NonAsyncDeliverQueue) queue);
- }
- }
-
private static class NonAsyncDeliverEntry extends OrderedQueueEntry
{
@@ -1107,9 +1097,23 @@ abstract class AbstractQueueTestBase extends QpidTestCase
private static class NonAsyncDeliverQueue extends AbstractQueue<NonAsyncDeliverQueue>
{
- public NonAsyncDeliverQueue(final TestSimpleQueueEntryListFactory factory, VirtualHostImpl vhost)
+ private QueueEntryList _entries = new NonAsyncDeliverList(this);
+
+ public NonAsyncDeliverQueue(VirtualHostImpl vhost)
+ {
+ super(vhost, attributes());
+ }
+
+ @Override
+ protected void onOpen()
+ {
+ super.onOpen();
+ }
+
+ @Override
+ QueueEntryList getEntries()
{
- super(vhost, attributes(), factory);
+ return _entries;
}
private static Map<String,Object> attributes()
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/LastValueQueueListTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/LastValueQueueListTest.java
index bc1d89a280..dfb6f0ee8c 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/LastValueQueueListTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/LastValueQueueListTest.java
@@ -59,7 +59,8 @@ public class LastValueQueueListTest extends TestCase
when(virtualHost.getSecurityManager()).thenReturn(mock(SecurityManager.class));
when(virtualHost.getEventLogger()).thenReturn(new EventLogger());
_queue = new LastValueQueueImpl(virtualHost, queueAttributes);
- _list = (LastValueQueueList) _queue.getEntries();
+ _queue.open();
+ _list = _queue.getEntries();
}
public void testListHasNoEntries()
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueListTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueListTest.java
index ad677a98a7..80e8252cb3 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueListTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueListTest.java
@@ -59,7 +59,8 @@ public class PriorityQueueListTest extends QpidTestCase
when(virtualHost.getSecurityManager()).thenReturn(mock(SecurityManager.class));
when(virtualHost.getEventLogger()).thenReturn(new EventLogger());
PriorityQueueImpl queue = new PriorityQueueImpl(virtualHost, queueAttributes);
- _list = (PriorityQueueList) queue.getEntries();
+ queue.open();
+ _list = queue.getEntries();
for (int i = 0; i < PRIORITIES.length; i++)
{
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java
index a5a25994ca..80a405d112 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java
@@ -18,24 +18,24 @@
*/
package org.apache.qpid.server.queue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.lang.reflect.Field;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
import junit.framework.TestCase;
import org.apache.qpid.server.logging.EventLogger;
import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.message.MessageInstance.EntryState;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
-import org.apache.qpid.server.message.MessageInstance.EntryState;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
-import java.lang.reflect.Field;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.UUID;
-
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
/**
* Tests for {@link QueueEntryImpl}
*/
@@ -200,7 +200,8 @@ public abstract class QueueEntryImplTestBase extends TestCase
when(virtualHost.getEventLogger()).thenReturn(new EventLogger());
StandardQueueImpl queue = new StandardQueueImpl(virtualHost, queueAttributes);
- OrderedQueueEntryList queueEntryList = (OrderedQueueEntryList) queue.getEntries();
+ queue.open();
+ OrderedQueueEntryList queueEntryList = queue.getEntries();
// create test entries
for(int i = 0; i < numberOfEntries ; i++)
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SelfValidatingSortedQueueEntryList.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SelfValidatingSortedQueueEntryList.java
index dc4609734d..42462d75c4 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SelfValidatingSortedQueueEntryList.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SelfValidatingSortedQueueEntryList.java
@@ -30,9 +30,9 @@ import org.apache.qpid.server.queue.SortedQueueEntry.Colour;
*/
public class SelfValidatingSortedQueueEntryList extends SortedQueueEntryList
{
- public SelfValidatingSortedQueueEntryList(SortedQueueImpl queue, String propertyName)
+ public SelfValidatingSortedQueueEntryList(SortedQueueImpl queue)
{
- super(queue, propertyName);
+ super(queue);
}
@Override
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java
index 61e396ac27..26220cb8fa 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java
@@ -20,18 +20,18 @@
*/
package org.apache.qpid.server.queue;
-import org.apache.qpid.server.logging.EventLogger;
-import org.apache.qpid.server.message.MessageReference;
-import org.apache.qpid.server.message.ServerMessage;
-import org.apache.qpid.server.model.Queue;
-import org.apache.qpid.server.virtualhost.VirtualHostImpl;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
+import org.apache.qpid.server.logging.EventLogger;
+import org.apache.qpid.server.message.MessageReference;
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.virtualhost.VirtualHostImpl;
public class SimpleQueueEntryImplTest extends QueueEntryImplTestBase
{
@@ -48,8 +48,8 @@ public class SimpleQueueEntryImplTest extends QueueEntryImplTestBase
when(virtualHost.getSecurityManager()).thenReturn(mock(org.apache.qpid.server.security.SecurityManager.class));
when(virtualHost.getEventLogger()).thenReturn(new EventLogger());
StandardQueueImpl queue = new StandardQueueImpl(virtualHost, queueAttributes);
-
- queueEntryList = (OrderedQueueEntryList) queue.getEntries();
+ queue.open();
+ queueEntryList = queue.getEntries();
super.setUp();
}
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java
index bcae391b92..d284324667 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java
@@ -86,15 +86,23 @@ public class SortedQueueEntryListTest extends QueueEntryListTestBase
final VirtualHostImpl virtualHost = mock(VirtualHostImpl.class);
when(virtualHost.getSecurityManager()).thenReturn(mock(SecurityManager.class));
when(virtualHost.getEventLogger()).thenReturn(new EventLogger());
- _testQueue = new SortedQueueImpl(virtualHost, attributes, new QueueEntryListFactory()
+ _testQueue = new SortedQueueImpl(virtualHost, attributes)
{
+ SelfValidatingSortedQueueEntryList _entries;
+ @Override
+ protected void onOpen()
+ {
+ super.onOpen();
+ _entries = new SelfValidatingSortedQueueEntryList(this);
+ }
@Override
- public SortedQueueEntryList createQueueEntryList(final AMQQueue queue)
+ SelfValidatingSortedQueueEntryList getEntries()
{
- return new SelfValidatingSortedQueueEntryList((SortedQueueImpl) queue, "KEY");
+ return _entries;
}
- });
+ };
+ _testQueue.open();
_sqel = (SelfValidatingSortedQueueEntryList) _testQueue.getEntries();
super.setUp();
@@ -124,7 +132,7 @@ public class SortedQueueEntryListTest extends QueueEntryListTestBase
{
if(newList)
{
- return new SelfValidatingSortedQueueEntryList(_testQueue, "KEY");
+ return new SelfValidatingSortedQueueEntryList(_testQueue);
}
else
{
@@ -195,7 +203,7 @@ public class SortedQueueEntryListTest extends QueueEntryListTestBase
public void testNonUniqueSortKeys() throws Exception
{
- _sqel = new SelfValidatingSortedQueueEntryList(_testQueue, "KEY");
+ _sqel = new SelfValidatingSortedQueueEntryList(_testQueue);
// Build test list
long messageId = 0L;
@@ -216,7 +224,7 @@ public class SortedQueueEntryListTest extends QueueEntryListTestBase
public void testNullSortKeys() throws Exception
{
- _sqel = new SelfValidatingSortedQueueEntryList(_testQueue, "KEY");
+ _sqel = new SelfValidatingSortedQueueEntryList(_testQueue);
// Build test list
long messageId = 0L;
@@ -237,7 +245,7 @@ public class SortedQueueEntryListTest extends QueueEntryListTestBase
public void testAscendingSortKeys() throws Exception
{
- _sqel = new SelfValidatingSortedQueueEntryList(_testQueue, "KEY");
+ _sqel = new SelfValidatingSortedQueueEntryList(_testQueue);
// Build test list
long messageId = 0L;
@@ -260,7 +268,7 @@ public class SortedQueueEntryListTest extends QueueEntryListTestBase
public void testDescendingSortKeys() throws Exception
{
- _sqel = new SelfValidatingSortedQueueEntryList(_testQueue, "KEY");
+ _sqel = new SelfValidatingSortedQueueEntryList(_testQueue);
// Build test list
long messageId = 0L;
@@ -283,7 +291,7 @@ public class SortedQueueEntryListTest extends QueueEntryListTestBase
public void testInsertAfter() throws Exception
{
- _sqel = new SelfValidatingSortedQueueEntryList(_testQueue, "KEY");
+ _sqel = new SelfValidatingSortedQueueEntryList(_testQueue);
ServerMessage msg = generateTestMessage(1, "A");
_sqel.add(msg);
@@ -303,7 +311,7 @@ public class SortedQueueEntryListTest extends QueueEntryListTestBase
public void testInsertBefore() throws Exception
{
- _sqel = new SelfValidatingSortedQueueEntryList(_testQueue, "KEY");
+ _sqel = new SelfValidatingSortedQueueEntryList(_testQueue);
ServerMessage msg = generateTestMessage(1, "B");
_sqel.add(msg);
@@ -323,7 +331,7 @@ public class SortedQueueEntryListTest extends QueueEntryListTestBase
public void testInsertInbetween() throws Exception
{
- _sqel = new SelfValidatingSortedQueueEntryList(_testQueue, "KEY");
+ _sqel = new SelfValidatingSortedQueueEntryList(_testQueue);
ServerMessage msg = generateTestMessage(1, "A");
_sqel.add(msg);
@@ -354,7 +362,7 @@ public class SortedQueueEntryListTest extends QueueEntryListTestBase
public void testInsertAtHead() throws Exception
{
- _sqel = new SelfValidatingSortedQueueEntryList(_testQueue, "KEY");
+ _sqel = new SelfValidatingSortedQueueEntryList(_testQueue);
ServerMessage msg = generateTestMessage(1, "B");
_sqel.add(msg);
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryTest.java
index 90c4a82747..1a26d2e2ec 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryTest.java
@@ -56,15 +56,23 @@ public class SortedQueueEntryTest extends QueueEntryImplTestBase
final VirtualHostImpl virtualHost = mock(VirtualHostImpl.class);
when(virtualHost.getSecurityManager()).thenReturn(mock(org.apache.qpid.server.security.SecurityManager.class));
when(virtualHost.getEventLogger()).thenReturn(new EventLogger());
- SortedQueueImpl queue = new SortedQueueImpl(virtualHost, attributes, new QueueEntryListFactory()
+ SortedQueueImpl queue = new SortedQueueImpl(virtualHost, attributes)
{
+ SelfValidatingSortedQueueEntryList _entries;
+ @Override
+ protected void onOpen()
+ {
+ super.onOpen();
+ _entries = new SelfValidatingSortedQueueEntryList(this);
+ }
@Override
- public SortedQueueEntryList createQueueEntryList(final AMQQueue queue)
+ SelfValidatingSortedQueueEntryList getEntries()
{
- return new SelfValidatingSortedQueueEntryList((SortedQueueImpl) queue, "KEY");
+ return _entries;
}
- });
+ };
+ queue.open();
_queueEntryList = (SelfValidatingSortedQueueEntryList) queue.getEntries();
super.setUp();
}
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java
index 2bab20a1b0..2a6e0d0863 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java
@@ -20,20 +20,20 @@
*/
package org.apache.qpid.server.queue;
-import org.apache.qpid.server.logging.EventLogger;
-import org.apache.qpid.server.message.MessageReference;
-import org.apache.qpid.server.message.ServerMessage;
-import org.apache.qpid.server.model.Queue;
-import org.apache.qpid.server.security.SecurityManager;
-import org.apache.qpid.server.virtualhost.VirtualHostImpl;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
+import org.apache.qpid.server.logging.EventLogger;
+import org.apache.qpid.server.message.MessageReference;
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.security.SecurityManager;
+import org.apache.qpid.server.virtualhost.VirtualHostImpl;
public class StandardQueueEntryListTest extends QueueEntryListTestBase
{
@@ -56,8 +56,8 @@ public class StandardQueueEntryListTest extends QueueEntryListTestBase
when(virtualHost.getSecurityManager()).thenReturn(mock(SecurityManager.class));
when(virtualHost.getEventLogger()).thenReturn(new EventLogger());
_testQueue = new StandardQueueImpl(virtualHost, queueAttributes);
-
- _sqel = (StandardQueueEntryList) _testQueue.getEntries();
+ _testQueue.open();
+ _sqel = _testQueue.getEntries();
for(int i = 1; i <= 100; i++)
{
final ServerMessage message = mock(ServerMessage.class);
@@ -102,8 +102,8 @@ public class StandardQueueEntryListTest extends QueueEntryListTestBase
when(virtualHost.getSecurityManager()).thenReturn(mock(SecurityManager.class));
when(virtualHost.getEventLogger()).thenReturn(new EventLogger());
StandardQueueImpl queue = new StandardQueueImpl(virtualHost, queueAttributes);
-
- return (StandardQueueEntryList) queue.getEntries();
+ queue.open();
+ return queue.getEntries();
}
else
{
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java
index f6d04175c5..db1537bcc5 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java
@@ -249,9 +249,17 @@ public class StandardQueueTest extends AbstractQueueTestBase
private static class DequeuedQueue extends AbstractQueue
{
+ private QueueEntryList _entries = new DequeuedQueueEntryList(this);
+
public DequeuedQueue(VirtualHostImpl virtualHost)
{
- super(virtualHost, attributes(), new DequeuedQueueEntryListFactory());
+ super(virtualHost, attributes());
+ }
+
+ @Override
+ QueueEntryList getEntries()
+ {
+ return _entries;
}
private static Map<String,Object> attributes()
@@ -264,19 +272,6 @@ public class StandardQueueTest extends AbstractQueueTestBase
return attributes;
}
}
- private static class DequeuedQueueEntryListFactory implements QueueEntryListFactory
- {
- public DequeuedQueueEntryList createQueueEntryList(AMQQueue queue)
- {
- /**
- * Override SimpleQueueEntryList to create a dequeued
- * entries for messages with even id
- */
- return new DequeuedQueueEntryList((DequeuedQueue) queue);
- }
-
-
- }
private static class DequeuedQueueEntryList extends OrderedQueueEntryList
{
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java
index ea5e02e4d9..b793a9182e 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java
@@ -53,6 +53,7 @@ import org.apache.qpid.server.plugin.ExchangeType;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler;
+import org.apache.qpid.server.virtualhost.AbstractVirtualHost;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
import org.apache.qpid.test.utils.QpidTestCase;
import org.apache.qpid.util.FileUtils;
@@ -286,7 +287,7 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest
public void testCreateQueueAMQQueueFieldTable() throws Exception
{
Map<String, Object> attributes = new HashMap<String, Object>();
- attributes.put(Queue.CREATE_DLQ_ON_CREATION, Boolean.TRUE);
+ attributes.put(AbstractVirtualHost.CREATE_DLQ_ON_CREATION, Boolean.TRUE);
attributes.put(Queue.MAXIMUM_DELIVERY_ATTEMPTS, 10);
AMQQueue queue = createTestQueue(getName(), getName() + "Owner", true, attributes);
@@ -335,7 +336,7 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest
{
// create queue
Map<String, Object> attributes = new HashMap<String, Object>();
- attributes.put(Queue.CREATE_DLQ_ON_CREATION, Boolean.TRUE);
+ attributes.put(AbstractVirtualHost.CREATE_DLQ_ON_CREATION, Boolean.TRUE);
attributes.put(Queue.MAXIMUM_DELIVERY_ATTEMPTS, 10);
AMQQueue queue = createTestQueue(getName(), getName() + "Owner", true, attributes);
@@ -361,7 +362,7 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest
{
// create queue
Map<String, Object> attributes = new HashMap<String, Object>();
- attributes.put(Queue.CREATE_DLQ_ON_CREATION, Boolean.TRUE);
+ attributes.put(AbstractVirtualHost.CREATE_DLQ_ON_CREATION, Boolean.TRUE);
attributes.put(Queue.MAXIMUM_DELIVERY_ATTEMPTS, 10);
AMQQueue queue = createTestQueue(getName(), getName() + "Owner", true, attributes);
DurableConfigurationStoreHelper.createQueue(_configStore, queue);
@@ -387,7 +388,7 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest
{
// create queue
Map<String, Object> attributes = new HashMap<String, Object>();
- attributes.put(Queue.CREATE_DLQ_ON_CREATION, Boolean.TRUE);
+ attributes.put(AbstractVirtualHost.CREATE_DLQ_ON_CREATION, Boolean.TRUE);
attributes.put(Queue.MAXIMUM_DELIVERY_ATTEMPTS, 10);
AMQQueue queue = createTestQueue(getName(), getName() + "Owner", true, attributes);
DurableConfigurationStoreHelper.createQueue(_configStore, queue);
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java
index 95de8fd9e5..2822476b1c 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java
@@ -67,7 +67,6 @@ import org.apache.qpid.server.store.DurableConfigurationStore;
import org.apache.qpid.server.store.DurableConfiguredObjectRecoverer;
import org.apache.qpid.server.store.StoreException;
import org.apache.qpid.server.store.UnresolvedConfiguredObject;
-import org.apache.qpid.server.util.MapValueConverter;
import org.apache.qpid.test.utils.QpidTestCase;
public class DurableConfigurationRecovererTest extends QpidTestCase
@@ -84,6 +83,7 @@ public class DurableConfigurationRecovererTest extends QpidTestCase
private DurableConfigurationStore _store;
private ConfiguredObjectFactory _configuredObjectFactory;
private ConfiguredObjectTypeFactory _exchangeFactory;
+ private ConfiguredObjectTypeFactory _queueFactory;
@Override
public void setUp() throws Exception
@@ -91,6 +91,8 @@ public class DurableConfigurationRecovererTest extends QpidTestCase
super.setUp();
_configuredObjectFactory = mock(ConfiguredObjectFactory.class);
_exchangeFactory = mock(ConfiguredObjectTypeFactory.class);
+ _queueFactory = mock(ConfiguredObjectTypeFactory.class);
+
AMQQueue<?> queue = mock(AMQQueue.class);
@@ -106,6 +108,8 @@ public class DurableConfigurationRecovererTest extends QpidTestCase
when(_vhost.getQueue(eq(QUEUE_ID))).thenReturn(queue);
when(_configuredObjectFactory.getConfiguredObjectTypeFactory(eq(Exchange.class), anyMap())).thenReturn(_exchangeFactory);
+ when(_configuredObjectFactory.getConfiguredObjectTypeFactory(eq(Queue.class), anyMap())).thenReturn(_queueFactory);
+
final ArgumentCaptor<ConfiguredObjectRecord> recoveredExchange = ArgumentCaptor.forClass(ConfiguredObjectRecord.class);
doAnswer(new Answer()
@@ -131,52 +135,38 @@ public class DurableConfigurationRecovererTest extends QpidTestCase
- final ArgumentCaptor<Map> attributesArg = ArgumentCaptor.forClass(Map.class);
- when(_vhost.restoreQueue(attributesArg.capture())).then(
- new Answer()
- {
+ final ArgumentCaptor<ConfiguredObjectRecord> recoveredQueue = ArgumentCaptor.forClass(ConfiguredObjectRecord.class);
+ doAnswer(new Answer()
+ {
- @Override
- public Object answer(final InvocationOnMock invocation) throws Throwable
- {
- final AMQQueue queue = mock(AMQQueue.class);
-
- final Map attributes = attributesArg.getValue();
- final String queueName = (String) attributes.get(Queue.NAME);
- final UUID queueId = MapValueConverter.getUUIDAttribute(Queue.ID, attributes);
-
- when(queue.getName()).thenReturn(queueName);
- when(queue.getId()).thenReturn(queueId);
- when(_vhost.getQueue(eq(queueName))).thenReturn(queue);
- when(_vhost.getQueue(eq(queueId))).thenReturn(queue);
-
- final ArgumentCaptor<ExchangeImpl> altExchangeArg = ArgumentCaptor.forClass(ExchangeImpl.class);
- doAnswer(
- new Answer()
- {
- @Override
- public Object answer(InvocationOnMock invocation) throws Throwable
- {
- final ExchangeImpl value = altExchangeArg.getValue();
- when(queue.getAlternateExchange()).thenReturn(value);
- return null;
- }
- }
- ).when(queue).setAlternateExchange(altExchangeArg.capture());
-
- Map args = attributes;
- if (args.containsKey(Queue.ALTERNATE_EXCHANGE))
- {
- final UUID exchangeId = UUID.fromString(args.get(Queue.ALTERNATE_EXCHANGE).toString());
- final ExchangeImpl exchange =
- (ExchangeImpl) _vhost.getExchange(exchangeId);
- queue.setAlternateExchange(exchange);
- }
- return queue;
- }
- });
+ @Override
+ public Object answer(final InvocationOnMock invocation) throws Throwable
+ {
+ ConfiguredObjectRecord queueRecord = recoveredQueue.getValue();
+ AMQQueue queue = mock(AMQQueue.class);
+ UUID id = queueRecord.getId();
+ String name = (String) queueRecord.getAttributes().get("name");
+ when(queue.getId()).thenReturn(id);
+ when(queue.getName()).thenReturn(name);
+ when(_vhost.getQueue(eq(id))).thenReturn(queue);
+ when(_vhost.getQueue(eq(name))).thenReturn(queue);
+ UnresolvedConfiguredObject unresolved = mock(UnresolvedConfiguredObject.class);
+ when(unresolved.resolve()).thenReturn(queue);
+
+ Map args = queueRecord.getAttributes();
+ if (args.containsKey(Queue.ALTERNATE_EXCHANGE))
+ {
+ final UUID exchangeId = UUID.fromString(args.get(Queue.ALTERNATE_EXCHANGE).toString());
+ final ExchangeImpl exchange =
+ _vhost.getExchange(exchangeId);
+ when(queue.getAlternateExchange()).thenReturn(exchange);
+ }
+
+ return unresolved;
+ }
+ }).when(_queueFactory).recover(recoveredQueue.capture(), any(ConfiguredObject.class));
DurableConfiguredObjectRecoverer[] recoverers = {
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java
index a58a251fbe..85eede527a 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java
@@ -652,12 +652,6 @@ public class MockVirtualHost implements VirtualHostImpl<MockVirtualHost, AMQQueu
}
@Override
- public AMQQueue restoreQueue(final Map<String, Object> attributes)
- {
- return null;
- }
-
- @Override
public boolean getDefaultDeadLetterQueueEnabled()
{
return false;
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostQueueCreationTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostQueueCreationTest.java
index 5313b416c0..6a27946c29 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostQueueCreationTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostQueueCreationTest.java
@@ -33,6 +33,7 @@ import org.apache.qpid.server.configuration.BrokerProperties;
import org.apache.qpid.server.configuration.updater.TaskExecutor;
import org.apache.qpid.server.exchange.ExchangeImpl;
import org.apache.qpid.server.model.Broker;
+import org.apache.qpid.server.model.Exchange;
import org.apache.qpid.server.model.LifetimePolicy;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.model.VirtualHost;
@@ -153,21 +154,21 @@ public class VirtualHostQueueCreationTest extends QpidTestCase
attributes.put(Queue.ID, UUID.randomUUID());
attributes.put(Queue.NAME, queueName);
- attributes.put(Queue.CREATE_DLQ_ON_CREATION, true);
+ attributes.put(AbstractVirtualHost.CREATE_DLQ_ON_CREATION, true);
AMQQueue queue = _virtualHost.createQueue(attributes);
- ExchangeImpl altExchange = queue.getAlternateExchange();
+ Exchange altExchange = queue.getAlternateExchange();
assertNotNull("Queue should have an alternate exchange as DLQ is enabled", altExchange);
assertEquals("Alternate exchange name was not as expected", dlExchangeName, altExchange.getName());
- assertEquals("Alternate exchange type was not as expected", ExchangeDefaults.FANOUT_EXCHANGE_CLASS, altExchange.getTypeName());
+ assertEquals("Alternate exchange type was not as expected", ExchangeDefaults.FANOUT_EXCHANGE_CLASS, altExchange.getType());
assertNotNull("The alternate exchange was not registered as expected", _virtualHost.getExchange(dlExchangeName));
assertEquals("The registered exchange was not the expected exchange instance", altExchange, _virtualHost.getExchange(dlExchangeName));
AMQQueue dlQueue = _virtualHost.getQueue(dlQueueName);
assertNotNull("The DLQ was not registered as expected", dlQueue);
- assertTrue("DLQ should have been bound to the alternate exchange", altExchange.isBound(dlQueue));
+ assertTrue("DLQ should have been bound to the alternate exchange", ((ExchangeImpl)altExchange).isBound(dlQueue));
assertNull("DLQ should have no alternate exchange", dlQueue.getAlternateExchange());
assertEquals("DLQ should have a zero maximum delivery count", 0, dlQueue.getMaximumDeliveryAttempts());
@@ -192,23 +193,23 @@ public class VirtualHostQueueCreationTest extends QpidTestCase
Map<String,Object> attributes = new HashMap<String, Object>();
attributes.put(Queue.ID, UUID.randomUUID());
attributes.put(Queue.NAME, queueName);
- attributes.put(Queue.CREATE_DLQ_ON_CREATION, true);
+ attributes.put(AbstractVirtualHost.CREATE_DLQ_ON_CREATION, true);
attributes.put(Queue.MAXIMUM_DELIVERY_ATTEMPTS, 5);
AMQQueue queue = _virtualHost.createQueue(attributes);
assertEquals("Unexpected maximum delivery count", 5, queue.getMaximumDeliveryAttempts());
- ExchangeImpl altExchange = queue.getAlternateExchange();
+ Exchange altExchange = queue.getAlternateExchange();
assertNotNull("Queue should have an alternate exchange as DLQ is enabled", altExchange);
assertEquals("Alternate exchange name was not as expected", dlExchangeName, altExchange.getName());
- assertEquals("Alternate exchange type was not as expected", ExchangeDefaults.FANOUT_EXCHANGE_CLASS, altExchange.getTypeName());
+ assertEquals("Alternate exchange type was not as expected", ExchangeDefaults.FANOUT_EXCHANGE_CLASS, altExchange.getType());
assertNotNull("The alternate exchange was not registered as expected", _virtualHost.getExchange(dlExchangeName));
assertEquals("The registered exchange was not the expected exchange instance", altExchange, _virtualHost.getExchange(dlExchangeName));
AMQQueue dlQueue = _virtualHost.getQueue(dlQueueName);
assertNotNull("The DLQ was not registered as expected", dlQueue);
- assertTrue("DLQ should have been bound to the alternate exchange", altExchange.isBound(dlQueue));
+ assertTrue("DLQ should have been bound to the alternate exchange", ((ExchangeImpl)altExchange).isBound(dlQueue));
assertNull("DLQ should have no alternate exchange", dlQueue.getAlternateExchange());
assertEquals("DLQ should have a zero maximum delivery count", 0, dlQueue.getMaximumDeliveryAttempts());
@@ -234,7 +235,7 @@ public class VirtualHostQueueCreationTest extends QpidTestCase
attributes.put(Queue.ID, UUID.randomUUID());
attributes.put(Queue.NAME, queueName);
- attributes.put(Queue.CREATE_DLQ_ON_CREATION, false);
+ attributes.put(AbstractVirtualHost.CREATE_DLQ_ON_CREATION, false);
AMQQueue queue = _virtualHost.createQueue(attributes);
@@ -266,7 +267,7 @@ public class VirtualHostQueueCreationTest extends QpidTestCase
attributes.put(Queue.ID, UUID.randomUUID());
attributes.put(Queue.NAME, queueName);
- attributes.put(Queue.CREATE_DLQ_ON_CREATION, true);
+ attributes.put(AbstractVirtualHost.CREATE_DLQ_ON_CREATION, true);
attributes.put(Queue.LIFETIME_POLICY, LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS);
//create an autodelete queue
@@ -338,7 +339,7 @@ public class VirtualHostQueueCreationTest extends QpidTestCase
catch (Exception e)
{
assertTrue(e instanceof IllegalArgumentException);
- assertEquals("Value for attribute name is not found", e.getMessage());
+ assertTrue(e.getMessage().startsWith("The name attribute is mandatory"));
}
}
@@ -359,7 +360,7 @@ public class VirtualHostQueueCreationTest extends QpidTestCase
attributes.put(Queue.ID, UUID.randomUUID());
attributes.put(Queue.NAME, queueName);
- attributes.put(Queue.CREATE_DLQ_ON_CREATION, true);
+ attributes.put(AbstractVirtualHost.CREATE_DLQ_ON_CREATION, true);
_virtualHost.createQueue(attributes);
fail("queue with DLQ name having more than 255 characters can not be created!");
@@ -389,7 +390,7 @@ public class VirtualHostQueueCreationTest extends QpidTestCase
attributes.put(Queue.ID, UUID.randomUUID());
attributes.put(Queue.NAME, queueName);
- attributes.put(Queue.CREATE_DLQ_ON_CREATION, (Object) true);
+ attributes.put(AbstractVirtualHost.CREATE_DLQ_ON_CREATION, (Object) true);
_virtualHost.createQueue(attributes);
fail("queue with DLE name having more than 255 characters can not be created!");
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
index a3fabf076c..d73d019000 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
@@ -20,28 +20,37 @@
*/
package org.apache.qpid.server.protocol.v0_10;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.qpid.server.consumer.AbstractConsumerTarget;
import org.apache.qpid.server.consumer.ConsumerImpl;
-import org.apache.qpid.server.exchange.ExchangeImpl;
import org.apache.qpid.server.flow.FlowCreditManager;
import org.apache.qpid.server.logging.EventLogger;
import org.apache.qpid.server.logging.messages.ChannelMessages;
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.model.Exchange;
import org.apache.qpid.server.plugin.MessageConverter;
import org.apache.qpid.server.protocol.MessageConverterRegistry;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.store.TransactionLogResource;
-import org.apache.qpid.server.consumer.AbstractConsumerTarget;
import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
-import org.apache.qpid.transport.*;
-
-import java.util.Collections;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
+import org.apache.qpid.transport.DeliveryProperties;
+import org.apache.qpid.transport.Header;
+import org.apache.qpid.transport.MessageAcceptMode;
+import org.apache.qpid.transport.MessageAcquireMode;
+import org.apache.qpid.transport.MessageCreditUnit;
+import org.apache.qpid.transport.MessageFlowMode;
+import org.apache.qpid.transport.MessageProperties;
+import org.apache.qpid.transport.MessageTransfer;
+import org.apache.qpid.transport.Method;
+import org.apache.qpid.transport.Option;
public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowCreditManager.FlowCreditManagerListener
{
@@ -405,7 +414,7 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC
if(owningResource instanceof AMQQueue)
{
final AMQQueue queue = (AMQQueue)owningResource;
- final ExchangeImpl alternateExchange = queue.getAlternateExchange();
+ final Exchange alternateExchange = queue.getAlternateExchange();
if(alternateExchange != null)
{
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
index 70094ea7c7..1f108ec3e9 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
@@ -63,7 +63,6 @@ import org.apache.qpid.server.TransactionTimeoutHelper.CloseAction;
import org.apache.qpid.server.configuration.BrokerProperties;
import org.apache.qpid.server.connection.SessionPrincipal;
import org.apache.qpid.server.consumer.ConsumerImpl;
-import org.apache.qpid.server.exchange.ExchangeImpl;
import org.apache.qpid.server.filter.AMQInvalidArgumentException;
import org.apache.qpid.server.filter.FilterManager;
import org.apache.qpid.server.filter.FilterManagerFactory;
@@ -86,6 +85,7 @@ import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.ConfigurationChangeListener;
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.Consumer;
+import org.apache.qpid.server.model.Exchange;
import org.apache.qpid.server.model.Session;
import org.apache.qpid.server.model.State;
import org.apache.qpid.server.protocol.AMQSessionModel;
@@ -1616,7 +1616,7 @@ public class AMQChannel<T extends AMQProtocolSession<T>>
{
final AMQQueue queue = (AMQQueue) owningResource;
- final ExchangeImpl altExchange = queue.getAlternateExchange();
+ final Exchange altExchange = queue.getAlternateExchange();
if (altExchange == null)
{
diff --git a/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/QueueMBean.java b/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/QueueMBean.java
index e95ed9a383..a9fc160618 100644
--- a/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/QueueMBean.java
+++ b/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/QueueMBean.java
@@ -24,6 +24,7 @@ package org.apache.qpid.server.jmx.mbeans;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import javax.management.JMException;
@@ -294,14 +295,19 @@ public class QueueMBean extends AMQManagedObject implements ManagedQueue, QueueN
{
if (exchangeName == null || "".equals(exchangeName))
{
- _queue.setAttribute(Queue.ALTERNATE_EXCHANGE, getAlternateExchange(), null);
+ _queue.setAttributes(Collections.singletonMap(Queue.ALTERNATE_EXCHANGE, null));
}
else
{
- VirtualHost<?,?,?> virtualHost = _queue.getParent(VirtualHost.class);
- Exchange exchange = MBeanUtils.findExchangeFromExchangeName(virtualHost, exchangeName);
+ try
+ {
- _queue.setAttribute(Queue.ALTERNATE_EXCHANGE, getAlternateExchange(), exchange);
+ _queue.setAttributes(Collections.<String,Object>singletonMap(Queue.ALTERNATE_EXCHANGE, exchangeName));
+ }
+ catch (IllegalArgumentException e)
+ {
+ throw new OperationsException("No such exchange \""+exchangeName+"\"");
+ }
}
}
diff --git a/qpid/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/QueueMBeanTest.java b/qpid/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/QueueMBeanTest.java
index a81de49fb9..6a49b7c4ed 100644
--- a/qpid/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/QueueMBeanTest.java
+++ b/qpid/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/QueueMBeanTest.java
@@ -19,8 +19,10 @@
package org.apache.qpid.server.jmx.mbeans;
import static org.mockito.Matchers.argThat;
+import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.isNull;
import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -320,7 +322,7 @@ public class QueueMBeanTest extends QpidTestCase
when(_mockQueue.getParent(VirtualHost.class)).thenReturn(mockVirtualHost);
_queueMBean.setAlternateExchange("exchange2");
- verify(_mockQueue).setAttribute(Queue.ALTERNATE_EXCHANGE, null, mockExchange2);
+ verify(_mockQueue).setAttributes(Collections.<String,Object>singletonMap(Queue.ALTERNATE_EXCHANGE, "exchange2"));
}
public void testSetAlternateExchangeWithUnknownExchangeName() throws Exception
@@ -331,7 +333,8 @@ public class QueueMBeanTest extends QpidTestCase
VirtualHost mockVirtualHost = mock(VirtualHost.class);
when(mockVirtualHost.getExchanges()).thenReturn(Collections.singletonList(mockExchange));
when(_mockQueue.getParent(VirtualHost.class)).thenReturn(mockVirtualHost);
-
+ doThrow(new IllegalArgumentException()).when(_mockQueue).setAttributes(
+ eq(Collections.<String, Object>singletonMap(Queue.ALTERNATE_EXCHANGE, "notknown")));
try
{
_queueMBean.setAlternateExchange("notknown");
@@ -346,7 +349,7 @@ public class QueueMBeanTest extends QpidTestCase
public void testRemoveAlternateExchange() throws Exception
{
_queueMBean.setAlternateExchange("");
- verify(_mockQueue).setAttribute(Queue.ALTERNATE_EXCHANGE, null, null);
+ verify(_mockQueue).setAttributes(Collections.singletonMap(Queue.ALTERNATE_EXCHANGE, null));
}
/********** Operations **********/
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/PersistentStoreTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/PersistentStoreTest.java
index 555c4dd20d..c2ea420e4b 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/PersistentStoreTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/PersistentStoreTest.java
@@ -21,8 +21,8 @@
package org.apache.qpid.server.store;
-import org.apache.qpid.client.AMQSession;
-import org.apache.qpid.test.utils.QpidBrokerTestCase;
+import java.util.ArrayList;
+import java.util.List;
import javax.jms.Connection;
import javax.jms.Destination;
@@ -30,8 +30,9 @@ import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
-import java.util.ArrayList;
-import java.util.List;
+
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
public class PersistentStoreTest extends QpidBrokerTestCase
{
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java
index 9949102af8..d37fb4d90f 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java
@@ -562,7 +562,7 @@ public class VirtualHostMessageStoreTest extends QpidTestCase
else if (lastValueQueue)
{
assertEquals("Queue is no longer a LastValue Queue", LastValueQueueImpl.class, queue.getClass());
- assertEquals("LastValue Queue Key has changed", LVQ_KEY, ((LastValueQueueImpl) queue).getConflationKey());
+ assertEquals("LastValue Queue Key has changed", LVQ_KEY, ((LastValueQueueImpl) queue).getLvqKey());
}
else
{
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/systest/management/jmx/QueueManagementTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/systest/management/jmx/QueueManagementTest.java
index 3bd91faa3e..a6a08d83f9 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/systest/management/jmx/QueueManagementTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/systest/management/jmx/QueueManagementTest.java
@@ -18,20 +18,19 @@
*/
package org.apache.qpid.systest.management.jmx;
-import org.apache.commons.lang.time.FastDateFormat;
-
-import org.apache.log4j.Logger;
-import org.apache.qpid.client.AMQSession;
-import org.apache.qpid.configuration.ClientProperties;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.management.common.mbeans.ManagedBroker;
-import org.apache.qpid.management.common.mbeans.ManagedQueue;
-import org.apache.qpid.server.queue.NotificationCheckTest;
-import org.apache.qpid.server.queue.QueueArgumentsConverter;
-import org.apache.qpid.server.queue.StandardQueueImpl;
-import org.apache.qpid.test.client.destination.AddressBasedDestinationTest;
-import org.apache.qpid.test.utils.JMXTestUtils;
-import org.apache.qpid.test.utils.QpidBrokerTestCase;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
import javax.jms.Connection;
import javax.jms.Destination;
@@ -48,19 +47,20 @@ import javax.management.openmbean.CompositeData;
import javax.management.openmbean.TabularData;
import javax.naming.NamingException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.SortedSet;
-import java.util.TreeSet;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
+import org.apache.commons.lang.time.FastDateFormat;
+import org.apache.log4j.Logger;
+
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.configuration.ClientProperties;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.management.common.mbeans.ManagedBroker;
+import org.apache.qpid.management.common.mbeans.ManagedQueue;
+import org.apache.qpid.server.queue.NotificationCheckTest;
+import org.apache.qpid.server.queue.QueueArgumentsConverter;
+import org.apache.qpid.server.queue.StandardQueueImpl;
+import org.apache.qpid.test.client.destination.AddressBasedDestinationTest;
+import org.apache.qpid.test.utils.JMXTestUtils;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
/**
* Tests the JMX API for the Managed Queue.
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/GroupProviderRestTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/GroupProviderRestTest.java
index 7a954c0185..9246877528 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/GroupProviderRestTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/GroupProviderRestTest.java
@@ -35,7 +35,6 @@ import org.apache.qpid.server.model.Group;
import org.apache.qpid.server.model.GroupProvider;
import org.apache.qpid.server.model.LifetimePolicy;
import org.apache.qpid.server.model.State;
-import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.model.adapter.FileBasedGroupProvider;
import org.apache.qpid.server.model.adapter.FileBasedGroupProviderImpl;
import org.apache.qpid.test.utils.TestBrokerConfiguration;
@@ -354,7 +353,6 @@ public class GroupProviderRestTest extends QpidRestTestCase
assertNotNull("Attribute " + Group.NAME, groupName);
assertNotNull("Attribute " + Group.ID, group.get(Group.ID));
- assertEquals("Attribute " + Group.ID, UUIDGenerator.generateGroupUUID(name, groupName).toString(), group.get(Group.ID));
}
}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/VirtualHostRestTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/VirtualHostRestTest.java
index 4535425ea4..f336c7fc77 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/VirtualHostRestTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/VirtualHostRestTest.java
@@ -38,10 +38,10 @@ import org.apache.qpid.server.model.Exchange;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.queue.LastValueQueue;
-import org.apache.qpid.server.queue.LastValueQueueImpl;
import org.apache.qpid.server.queue.PriorityQueue;
import org.apache.qpid.server.queue.SortedQueue;
import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.virtualhost.AbstractVirtualHost;
import org.apache.qpid.server.virtualhost.StandardVirtualHost;
import org.apache.qpid.util.FileUtils;
@@ -274,7 +274,7 @@ public class VirtualHostRestTest extends QpidRestTestCase
Asserts.assertQueue(queueName , "lvq", lvqQueue);
assertEquals("Unexpected value of queue attribute " + Queue.DURABLE, Boolean.TRUE, lvqQueue.get(Queue.DURABLE));
- assertEquals("Unexpected lvq key attribute", LastValueQueueImpl.DEFAULT_LVQ_KEY, lvqQueue.get(LastValueQueue.LVQ_KEY));
+ assertEquals("Unexpected lvq key attribute", LastValueQueue.DEFAULT_LVQ_KEY, lvqQueue.get(LastValueQueue.LVQ_KEY));
}
public void testPutCreateSortedQueueWithoutKey() throws Exception
@@ -443,15 +443,15 @@ public class VirtualHostRestTest extends QpidRestTestCase
String queueName = getTestQueueName();
Map<String, Object> attributes = new HashMap<String, Object>();
- attributes.put(Queue.CREATE_DLQ_ON_CREATION, true);
+ attributes.put(AbstractVirtualHost.CREATE_DLQ_ON_CREATION, true);
//verify the starting state
Map<String, Object> hostDetails = getRestTestHelper().getJsonAsSingletonList("/rest/virtualhost/test");
List<Map<String, Object>> queues = (List<Map<String, Object>>) hostDetails.get(VirtualHostRestTest.VIRTUALHOST_QUEUES_ATTRIBUTE);
List<Map<String, Object>> exchanges = (List<Map<String, Object>>) hostDetails.get(VirtualHostRestTest.VIRTUALHOST_EXCHANGES_ATTRIBUTE);
- assertNull("queue should not have already been present", getRestTestHelper().find(Queue.NAME, queueName , queues));
- assertNull("queue should not have already been present", getRestTestHelper().find(Queue.NAME, queueName + "_DLQ" , queues));
+ assertNull("queue "+ queueName + " should not have already been present", getRestTestHelper().find(Queue.NAME, queueName , queues));
+ assertNull("queue "+ queueName + "_DLQ should not have already been present", getRestTestHelper().find(Queue.NAME, queueName + "_DLQ" , queues));
assertNull("exchange should not have already been present", getRestTestHelper().find(Exchange.NAME, queueName + "_DLE" , exchanges));
//create the queue
@@ -465,9 +465,9 @@ public class VirtualHostRestTest extends QpidRestTestCase
Map<String, Object> queue = getRestTestHelper().find(Queue.NAME, queueName , queues);
Map<String, Object> dlqQueue = getRestTestHelper().find(Queue.NAME, queueName + "_DLQ" , queues);
Map<String, Object> dlExchange = getRestTestHelper().find(Exchange.NAME, queueName + "_DLE" , exchanges);
- assertNotNull("queue should not have been present", queue);
- assertNotNull("queue should not have been present", dlqQueue);
- assertNotNull("exchange should not have been present", dlExchange);
+ assertNotNull("queue should have been present", queue);
+ assertNotNull("queue should have been present", dlqQueue);
+ assertNotNull("exchange should have been present", dlExchange);
//verify that the alternate exchange is set as expected on the new queue
Map<String, Object> queueAttributes = new HashMap<String, Object>();