diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2014-04-18 13:37:32 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2014-04-18 13:37:32 +0000 |
| commit | ef4adc559cc4b81a0c681807986c62fc0b9a13e4 (patch) | |
| tree | a66bb7845d9074bb79253f11bd5636c9a30e5324 /qpid/java | |
| parent | 36c6512134a729f2f7abb1fa6469a63b743dad1b (diff) | |
| download | qpid-python-ef4adc559cc4b81a0c681807986c62fc0b9a13e4.tar.gz | |
QPID-5710 : [Java Broker] Use common creation/recovery mechanism for Queues
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1588468 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
70 files changed, 1101 insertions, 1003 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/startup/UnresolvedObjectWithParents.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/startup/UnresolvedObjectWithParents.java deleted file mode 100644 index 4b2655e8c5..0000000000 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/startup/UnresolvedObjectWithParents.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.configuration.startup; - -import org.apache.qpid.server.model.ConfiguredObject; -import org.apache.qpid.server.store.UnresolvedObject; - -public interface UnresolvedObjectWithParents<X> extends UnresolvedObject<X> -{ - void resolvedParent(final UnresolvedParentDependency<?> unresolvedParentDependency, - final ConfiguredObject<?> dependency); -} diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/startup/UnresolvedParentDependency.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/startup/UnresolvedParentDependency.java deleted file mode 100644 index ae10106270..0000000000 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/startup/UnresolvedParentDependency.java +++ /dev/null @@ -1,61 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.configuration.startup; - -import org.apache.qpid.server.model.ConfiguredObject; -import org.apache.qpid.server.store.ConfiguredObjectRecord; -import org.apache.qpid.server.store.UnresolvedDependency; - -import java.util.UUID; - -class UnresolvedParentDependency<X extends ConfiguredObject<X>> implements UnresolvedDependency<X> -{ - private final UUID _id; - private final String _type; - private final UnresolvedObjectWithParents _unresolvedObject; - - public UnresolvedParentDependency(final UnresolvedObjectWithParents unresolvedObject, - final String type, - final ConfiguredObjectRecord record) - { - _type = type; - _id = record.getId(); - _unresolvedObject = unresolvedObject; - } - - @Override - public UUID getId() - { - return _id; - } - - @Override - public String getType() - { - return _type; - } - - @Override - public void resolve(final X dependency) - { - _unresolvedObject.resolvedParent(this, dependency); - } -} diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/store/MemoryConfigurationEntryStore.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/store/MemoryConfigurationEntryStore.java index e6e4d0052b..b21ecd3811 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/store/MemoryConfigurationEntryStore.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/store/MemoryConfigurationEntryStore.java @@ -784,7 +784,7 @@ public class MemoryConfigurationEntryStore implements ConfigurationEntryStore UUID id = null; if (idAsString == null) { - id = UUIDGenerator.generateRandomUUID(); + id = UUID.randomUUID(); _generatedObjectIdDuringLoad = true; } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java index a04635ec74..c2be3ee3cf 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java @@ -194,6 +194,10 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im _id = uuid; _name = AttributeValueConverter.STRING_CONVERTER.convert(attributes.get(NAME),this); + if(_name == null) + { + throw new IllegalArgumentException("The name attribute is mandatory for " + getClass().getSimpleName() + " creation."); + } _attributeTypes = getAttributeTypes(getClass()); _automatedFields = getAutomatedFields(getClass()); @@ -267,6 +271,7 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im if(attr.getAnnotation().mandatory() && !(_attributes.containsKey(attr.getName()) || !"".equals(attr.getAnnotation().defaultValue()))) { + deleted(); throw new IllegalArgumentException("Mandatory attribute " + attr.getName() + " not supplied for instance of " + getClass().getName()); } } @@ -367,9 +372,9 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im { if(_open.compareAndSet(false,true)) { - doResolution(); - doValidation(); - doOpening(); + doResolution(true); + doValidation(true); + doOpening(true); } } @@ -378,75 +383,87 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im { if(_open.compareAndSet(false,true)) { - doResolution(); - doValidation(); - doCreation(); - doOpening(); + doResolution(true); + doValidation(true); + doCreation(true); + doOpening(true); } } - protected void doOpening() + protected void doOpening(final boolean skipCheck) { - onOpen(); - applyToChildren(new Action<ConfiguredObject<?>>() + if(skipCheck || _open.compareAndSet(false,true)) { - @Override - public void performAction(final ConfiguredObject<?> child) + onOpen(); + applyToChildren(new Action<ConfiguredObject<?>>() { - if(child instanceof AbstractConfiguredObject) + @Override + public void performAction(final ConfiguredObject<?> child) { - ((AbstractConfiguredObject)child).doOpening(); + if (child instanceof AbstractConfiguredObject) + { + ((AbstractConfiguredObject) child).doOpening(false); + } } - } - }); + }); + } } - protected final void doValidation() + protected final void doValidation(final boolean skipCheck) { - applyToChildren(new Action<ConfiguredObject<?>>() + if(skipCheck || !_open.get()) { - @Override - public void performAction(final ConfiguredObject<?> child) + applyToChildren(new Action<ConfiguredObject<?>>() { - if(child instanceof AbstractConfiguredObject) + @Override + public void performAction(final ConfiguredObject<?> child) { - ((AbstractConfiguredObject)child).doValidation(); + if (child instanceof AbstractConfiguredObject) + { + ((AbstractConfiguredObject) child).doValidation(false); + } } - } - }); - validate(); + }); + validate(); + } } - protected final void doResolution() + protected final void doResolution(final boolean skipCheck) { - resolve(); - applyToChildren(new Action<ConfiguredObject<?>>() + if(skipCheck || !_open.get()) { - @Override - public void performAction(final ConfiguredObject<?> child) + resolve(); + applyToChildren(new Action<ConfiguredObject<?>>() { - if(child instanceof AbstractConfiguredObject) + @Override + public void performAction(final ConfiguredObject<?> child) { - ((AbstractConfiguredObject)child).doResolution(); + if (child instanceof AbstractConfiguredObject) + { + ((AbstractConfiguredObject) child).doResolution(false); + } } - } - }); + }); + } } - protected final void doCreation() + protected final void doCreation(final boolean skipCheck) { - onCreate(); - applyToChildren(new Action<ConfiguredObject<?>>() + if(skipCheck || !_open.get()) { - @Override - public void performAction(final ConfiguredObject<?> child) + onCreate(); + applyToChildren(new Action<ConfiguredObject<?>>() { - if(child instanceof AbstractConfiguredObject) + @Override + public void performAction(final ConfiguredObject<?> child) { - ((AbstractConfiguredObject)child).doCreation(); + if (child instanceof AbstractConfiguredObject) + { + ((AbstractConfiguredObject) child).doCreation(false); + } } - } - }); + }); + } } private void applyToChildren(Action<ConfiguredObject<?>> action) diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObjectTypeFactory.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObjectTypeFactory.java index a965044c90..38f885717f 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObjectTypeFactory.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObjectTypeFactory.java @@ -32,7 +32,7 @@ abstract public class AbstractConfiguredObjectTypeFactory<X extends AbstractConf { private final Class<X> _clazz; - protected AbstractConfiguredObjectTypeFactory(final Class<X> clazz) + public AbstractConfiguredObjectTypeFactory(final Class<X> clazz) { _clazz = clazz; } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectAttribute.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectAttribute.java index 5c4ed71902..e7fb50be87 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectAttribute.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectAttribute.java @@ -46,6 +46,6 @@ public final class ConfiguredObjectAttribute<C extends ConfiguredObject, T> exte public T convert(final Object value, C object) { - return _converter.convert(value, object); + return getConverter().convert(value, object); } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectAttributeOrStatistic.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectAttributeOrStatistic.java index da1edd7da2..2e1350981c 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectAttributeOrStatistic.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectAttributeOrStatistic.java @@ -26,18 +26,18 @@ import java.lang.reflect.Method; abstract class ConfiguredObjectAttributeOrStatistic<C extends ConfiguredObject, T> { - protected final String _name; - protected final Class<T> _type; - protected final AttributeValueConverter<T> _converter; - protected final Method _getter; + private final String _name; + private final Class<T> _type; + private final AttributeValueConverter<T> _converter; + private final Method _getter; ConfiguredObjectAttributeOrStatistic(final Method getter) { _getter = getter; _type = (Class<T>) getTypeFromMethod(getter); - _name = getNameFromMethod(getter, _type); - _converter = AttributeValueConverter.getConverter(_type, getter.getGenericReturnType()); + _name = getNameFromMethod(getter, getType()); + _converter = AttributeValueConverter.getConverter(getType(), getter.getGenericReturnType()); } @@ -133,17 +133,17 @@ abstract class ConfiguredObjectAttributeOrStatistic<C extends ConfiguredObject, { try { - return (T) _getter.invoke(configuredObject); + return (T) getGetter().invoke(configuredObject); } catch (IllegalAccessException e) { - Object o = configuredObject.getAttribute(_name); - return _converter.convert(o, configuredObject); + Object o = configuredObject.getAttribute(getName()); + return getConverter().convert(o, configuredObject); } catch (InvocationTargetException e) { - Object o = configuredObject.getAttribute(_name); - return _converter.convert(o, configuredObject); + Object o = configuredObject.getAttribute(getName()); + return getConverter().convert(o, configuredObject); } } @@ -152,4 +152,9 @@ abstract class ConfiguredObjectAttributeOrStatistic<C extends ConfiguredObject, { return _getter; } + + public AttributeValueConverter<T> getConverter() + { + return _converter; + } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Exchange.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Exchange.java index e7b5dd971d..551f27aee2 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Exchange.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Exchange.java @@ -23,8 +23,10 @@ package org.apache.qpid.server.model; import java.util.Collection; import java.util.Map; +import org.apache.qpid.server.message.MessageDestination; + @ManagedObject -public interface Exchange<X extends Exchange<X>> extends ConfiguredObject<X> +public interface Exchange<X extends Exchange<X>> extends ConfiguredObject<X>, MessageDestination { String STATE = "state"; String ALTERNATE_EXCHANGE = "alternateExchange"; diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java index fed8862782..fb9548c429 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java @@ -45,27 +45,35 @@ public interface Queue<X extends Queue<X>> extends ConfiguredObject<X> String QUEUE_FLOW_RESUME_SIZE_BYTES = "queueFlowResumeSizeBytes"; String QUEUE_FLOW_STOPPED = "queueFlowStopped"; - String CREATE_DLQ_ON_CREATION = "x-qpid-dlq-enabled"; // TODO - this value should change - - @ManagedAttribute + @ManagedAttribute( automate = true ) Exchange getAlternateExchange(); @ManagedAttribute( automate = true, defaultValue = "NONE" ) ExclusivityPolicy getExclusive(); - @ManagedAttribute + @ManagedAttribute( derived = true ) String getOwner(); - @ManagedAttribute - boolean getNoLocal(); + @ManagedAttribute( automate = true ) + boolean isNoLocal(); - @ManagedAttribute + @ManagedAttribute( automate = true ) String getMessageGroupKey(); + @ManagedContextDefault( name = "qpid.broker_default-shared-message-group") + String DEFAULT_SHARED_MESSAGE_GROUP = "qpid.no-group"; + + @ManagedAttribute( automate = true, defaultValue = "${qpid.broker_default-shared-message-group}") + String getMessageGroupDefaultGroup(); + + @ManagedContextDefault( name = "queue.maximumDistinctGroups") + int DEFAULT_MAXIMUM_DISTINCT_GROUPS = 255; + + @ManagedAttribute( automate = true, defaultValue = "${queue.maximumDistinctGroups}") + int getMaximumDistinctGroups(); - // TODO - this should either be a boolean or maybe an enum - @ManagedAttribute + @ManagedAttribute( automate = true ) boolean isMessageGroupSharedGroups(); @ManagedContextDefault( name = "queue.maximumDeliveryAttempts") diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileBasedGroupProviderImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileBasedGroupProviderImpl.java index be14b284b1..ae2c412077 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileBasedGroupProviderImpl.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileBasedGroupProviderImpl.java @@ -46,7 +46,6 @@ import org.apache.qpid.server.model.GroupProvider; import org.apache.qpid.server.model.IllegalStateTransitionException; import org.apache.qpid.server.model.ManagedAttributeField; import org.apache.qpid.server.model.State; -import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.security.access.Operation; import org.apache.qpid.server.security.auth.UsernamePrincipal; @@ -139,7 +138,7 @@ public class FileBasedGroupProviderImpl for (Principal group : groups) { Map<String,Object> attrMap = new HashMap<String, Object>(); - UUID id = UUIDGenerator.generateGroupUUID(getName(),group.getName()); + UUID id = UUID.randomUUID(); attrMap.put(Group.ID, id); attrMap.put(Group.NAME, group.getName()); GroupAdapter groupAdapter = new GroupAdapter(attrMap, getTaskExecutor()); @@ -228,7 +227,7 @@ public class FileBasedGroupProviderImpl _groupDatabase.createGroup(groupName); Map<String,Object> attrMap = new HashMap<String, Object>(); - UUID id = UUIDGenerator.generateGroupUUID(getName(),groupName); + UUID id = UUID.randomUUID(); attrMap.put(Group.ID, id); attrMap.put(Group.NAME, groupName); GroupAdapter groupAdapter = new GroupAdapter(attrMap, getTaskExecutor()); @@ -442,7 +441,7 @@ public class FileBasedGroupProviderImpl Collection<GroupMember> members = new ArrayList<GroupMember>(); for (Principal principal : usersInGroup) { - UUID id = UUIDGenerator.generateGroupMemberUUID(FileBasedGroupProviderImpl.this.getName(), getName(), principal.getName()); + UUID id = UUID.randomUUID(); Map<String,Object> attrMap = new HashMap<String, Object>(); attrMap.put(GroupMember.ID,id); attrMap.put(GroupMember.NAME, principal.getName()); @@ -493,7 +492,7 @@ public class FileBasedGroupProviderImpl getSecurityManager().authoriseGroupOperation(Operation.UPDATE, getName()); _groupDatabase.addUserToGroup(memberName, getName()); - UUID id = UUIDGenerator.generateGroupMemberUUID(FileBasedGroupProviderImpl.this.getName(), getName(), memberName); + UUID id = UUID.randomUUID(); Map<String,Object> attrMap = new HashMap<String, Object>(); attrMap.put(GroupMember.ID,id); attrMap.put(GroupMember.NAME, memberName); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAliasAdapter.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAliasAdapter.java index 9a01cd6aeb..0b1409c5b9 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAliasAdapter.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAliasAdapter.java @@ -25,13 +25,13 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.Map; +import java.util.UUID; import org.apache.qpid.server.model.AbstractConfiguredObject; import org.apache.qpid.server.model.AuthenticationMethod; import org.apache.qpid.server.model.ConfiguredObject; import org.apache.qpid.server.model.Port; import org.apache.qpid.server.model.State; -import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.model.VirtualHostAlias; @@ -52,7 +52,7 @@ public class VirtualHostAliasAdapter extends AbstractConfiguredObject<VirtualHos private static Map<String, Object> createAttributes(final VirtualHost virtualHost, final Port port) { final Map<String, Object> attributes = new HashMap<String, Object>(); - attributes.put(ID, UUIDGenerator.generateVhostAliasUUID(virtualHost.getName(), port.getName())); + attributes.put(ID, UUID.randomUUID()); attributes.put(NAME, virtualHost.getName()); return attributes; } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java index 6a2a6449eb..e31eac77b4 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java @@ -25,12 +25,10 @@ import java.util.List; import java.util.Set; import org.apache.qpid.server.binding.BindingImpl; -import org.apache.qpid.server.exchange.ExchangeImpl; import org.apache.qpid.server.exchange.ExchangeReferrer; import org.apache.qpid.server.logging.LogSubject; import org.apache.qpid.server.message.MessageDestination; import org.apache.qpid.server.message.MessageSource; -import org.apache.qpid.server.model.LifetimePolicy; import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.model.QueueNotificationListener; import org.apache.qpid.server.protocol.CapacityChecker; @@ -60,10 +58,6 @@ public interface AMQQueue<X extends AMQQueue<X>> long getTotalEnqueuedMessages(); - LifetimePolicy getLifetimePolicy(); - - String getOwner(); - VirtualHostImpl getVirtualHost(); public Collection<QueueConsumer<?>> getConsumers(); @@ -118,41 +112,6 @@ public interface AMQQueue<X extends AMQQueue<X>> void visit(QueueEntryVisitor visitor); - - long getAlertThresholdMessageSize(); - - void setAlertThresholdMessageSize(long value); - - - long getAlertThresholdQueueDepthMessages(); - - void setAlertThresholdQueueDepthMessages(long value); - - - long getAlertThresholdQueueDepthBytes(); - - void setAlertThresholdQueueDepthBytes(long value); - - - long getAlertThresholdMessageAge(); - - void setAlertThresholdMessageAge(final long maximumMessageAge); - - - long getAlertRepeatGap(); - - void setAlertRepeatGap(long value); - - - long getQueueFlowControlSizeBytes(); - - void setQueueFlowControlSizeBytes(long capacity); - - - long getQueueFlowResumeSizeBytes(); - - void setQueueFlowResumeSizeBytes(long flowResumeCapacity); - boolean isOverfull(); long clearQueue(); @@ -168,10 +127,6 @@ public interface AMQQueue<X extends AMQQueue<X>> void stop(); - ExchangeImpl getAlternateExchange(); - - void setAlternateExchange(ExchangeImpl exchange); - Collection<String> getAvailableAttributes(); Object getAttribute(String attrName); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java index f6a667e612..faa44b2288 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java @@ -18,7 +18,6 @@ */ package org.apache.qpid.server.queue; -import java.lang.reflect.Type; import java.security.AccessControlException; import java.security.AccessController; import java.security.Principal; @@ -27,7 +26,6 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.EnumSet; -import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -46,7 +44,6 @@ import org.apache.log4j.Logger; import org.apache.qpid.pool.ReferenceCountingExecutorService; import org.apache.qpid.server.binding.BindingImpl; -import org.apache.qpid.server.configuration.BrokerProperties; import org.apache.qpid.server.configuration.IllegalConfigurationException; import org.apache.qpid.server.connection.SessionPrincipal; import org.apache.qpid.server.consumer.ConsumerImpl; @@ -54,6 +51,7 @@ import org.apache.qpid.server.consumer.ConsumerTarget; import org.apache.qpid.server.exchange.ExchangeImpl; import org.apache.qpid.server.filter.FilterManager; import org.apache.qpid.server.logging.EventLogger; +import org.apache.qpid.server.logging.LogMessage; import org.apache.qpid.server.logging.LogSubject; import org.apache.qpid.server.logging.messages.QueueMessages; import org.apache.qpid.server.logging.subjects.QueueLogSubject; @@ -100,10 +98,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> public static final String SHARED_MSG_GROUP_ARG_VALUE = "1"; private static final String QPID_NO_GROUP = "qpid.no-group"; - private static final String DEFAULT_SHARED_MESSAGE_GROUP = System.getProperty(BrokerProperties.PROPERTY_DEFAULT_SHARED_MESSAGE_GROUP, QPID_NO_GROUP); - // TODO - should make this configurable at the vhost / broker level - private static final int DEFAULT_MAX_GROUPS = 255; private static final QueueNotificationListener NULL_NOTIFICATION_LISTENER = new QueueNotificationListener() { @Override @@ -118,11 +113,10 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> private final VirtualHostImpl _virtualHost; private final DeletedChildListener _deletedChildListener = new DeletedChildListener(); - private ExchangeImpl _alternateExchange; + @ManagedAttributeField( beforeSet = "preSetAlternateExchange", afterSet = "postSetAlternateExchange") + private Exchange _alternateExchange; - private final QueueEntryList _entries; - private final QueueConsumerList _consumerList = new QueueConsumerList(); private volatile QueueConsumer<?> _exclusiveSubscriber; @@ -203,6 +197,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> private LogSubject _logSubject; + @ManagedAttributeField private boolean _noLocal; private final AtomicBoolean _overfull = new AtomicBoolean(false); @@ -224,19 +219,37 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> private QueueNotificationListener _notificationListener; private final long[] _lastNotificationTimes = new long[NotificationCheck.values().length]; + @ManagedAttributeField + private String _messageGroupKey; + @ManagedAttributeField + private boolean _messageGroupSharedGroups; + @ManagedAttributeField + private String _messageGroupDefaultGroup; + @ManagedAttributeField + private int _maximumDistinctGroups; + protected AbstractQueue(VirtualHostImpl virtualHost, - Map<String, Object> attributes, - QueueEntryListFactory entryListFactory) + Map<String, Object> attributes) { super(parentsMap(virtualHost), attributes, virtualHost.getTaskExecutor()); - _entries = entryListFactory.createQueueEntryList(this); _virtualHost = virtualHost; _asyncDelivery = ReferenceCountingExecutorService.getInstance().acquireExecutorService(); + } + + @Override + protected void onCreate() + { + super.onCreate(); + if (isDurable() && !(getLifetimePolicy() == LifetimePolicy.DELETE_ON_CONNECTION_CLOSE + || getLifetimePolicy() == LifetimePolicy.DELETE_ON_SESSION_END)) + { + DurableConfigurationStoreHelper.createQueue(_virtualHost.getDurableConfigurationStore(), this); + } } public void validate() @@ -308,9 +321,6 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> _arguments = Collections.synchronizedMap(arguments); - _noLocal = MapValueConverter.getBooleanAttribute(Queue.NO_LOCAL, attributes, false); - - _logSubject = new QueueLogSubject(this); try @@ -423,29 +433,18 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> // Log the creation of this Queue. // The priorities display is toggled on if we set priorities > 0 getEventLogger().message(_logSubject, - QueueMessages.CREATED(ownerString, - _entries.getPriorities(), - ownerString != null, - getLifetimePolicy() != LifetimePolicy.PERMANENT, - isDurable(), - !isDurable(), - _entries.getPriorities() > 0)); + getCreatedLogMessage()); - if(attributes != null && attributes.containsKey(Queue.MESSAGE_GROUP_KEY)) + if(getMessageGroupKey() != null) { - if(attributes.get(Queue.MESSAGE_GROUP_SHARED_GROUPS) != null - && (Boolean)(attributes.get(Queue.MESSAGE_GROUP_SHARED_GROUPS))) + if(isMessageGroupSharedGroups()) { - Object defaultGroup = attributes.get(Queue.MESSAGE_GROUP_DEFAULT_GROUP); _messageGroupManager = - new DefinedGroupMessageGroupManager(String.valueOf(attributes.get(Queue.MESSAGE_GROUP_KEY)), - defaultGroup == null ? DEFAULT_SHARED_MESSAGE_GROUP : defaultGroup.toString(), - this); + new DefinedGroupMessageGroupManager(getMessageGroupKey(), getMessageGroupDefaultGroup(), this); } else { - _messageGroupManager = new AssignedConsumerMessageGroupManager(String.valueOf(attributes.get( - Queue.MESSAGE_GROUP_KEY)), DEFAULT_MAX_GROUPS); + _messageGroupManager = new AssignedConsumerMessageGroupManager(getMessageGroupKey(), getMaximumDistinctGroups()); } } else @@ -456,6 +455,18 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> updateAlertChecks(); } + protected LogMessage getCreatedLogMessage() + { + String ownerString = getOwner(); + return QueueMessages.CREATED(ownerString, + 0, + ownerString != null, + getLifetimePolicy() != LifetimePolicy.PERMANENT, + isDurable(), + !isDurable(), + false); + } + private void addLifetimeConstraint(final Deletable<? extends Deletable> lifetimeObject) { final Action<Deletable> deleteQueueTask = new Action<Deletable>() @@ -501,32 +512,35 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> } } - public void setNoLocal(boolean nolocal) - { - _noLocal = nolocal; - } - public boolean isExclusive() { return _exclusive != ExclusivityPolicy.NONE; } - public ExchangeImpl getAlternateExchange() + public Exchange<?> getAlternateExchange() { return _alternateExchange; } public void setAlternateExchange(ExchangeImpl exchange) { - if(_alternateExchange != null) + _alternateExchange = exchange; + } + + private void postSetAlternateExchange() + { + if(_alternateExchange instanceof ExchangeImpl) { - _alternateExchange.removeReference(this); + ((ExchangeImpl)_alternateExchange).addReference(this); } - if(exchange != null) + } + + private void preSetAlternateExchange() + { + if(_alternateExchange instanceof ExchangeImpl) { - exchange.addReference(this); + ((ExchangeImpl)_alternateExchange).removeReference(this); } - _alternateExchange = exchange; } @@ -712,7 +726,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> } consumer.setStateListener(this); - consumer.setQueueContext(new QueueContext(_entries.getHead())); + consumer.setQueueContext(new QueueContext(getEntries().getHead())); if (!isDeleted()) { @@ -908,7 +922,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> final QueueConsumer<?> exclusiveSub = _exclusiveSubscriber; - final QueueEntry entry = _entries.add(message); + final QueueEntry entry = getEntries().add(message); if(action != null || (exclusiveSub == null && _queueRunner.isIdle())) { @@ -1271,7 +1285,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> protected QueueEntry getOldestQueueEntry() { - return _entries.next(_entries.getHead()); + return getEntries().next(getEntries().getHead()); } public boolean isDeleted() @@ -1282,7 +1296,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> public List<QueueEntry> getMessagesOnTheQueue() { ArrayList<QueueEntry> entryList = new ArrayList<QueueEntry>(); - QueueEntryIterator queueListIterator = _entries.iterator(); + QueueEntryIterator queueListIterator = getEntries().iterator(); while (queueListIterator.advance()) { QueueEntry node = queueListIterator.getNode(); @@ -1344,10 +1358,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> } /** Used to track bindings to exchanges so that on deletion they can easily be cancelled. */ - protected QueueEntryList getEntries() - { - return _entries; - } + abstract QueueEntryList getEntries(); protected QueueConsumerList getConsumerList() { @@ -1410,7 +1421,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> public List<QueueEntry> getMessagesOnTheQueue(QueueEntryFilter filter) { ArrayList<QueueEntry> entryList = new ArrayList<QueueEntry>(); - QueueEntryIterator queueListIterator = _entries.iterator(); + QueueEntryIterator queueListIterator = getEntries().iterator(); while (queueListIterator.advance() && !filter.filterComplete()) { QueueEntry node = queueListIterator.getNode(); @@ -1425,7 +1436,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> public void visit(final QueueEntryVisitor visitor) { - QueueEntryIterator queueListIterator = _entries.iterator(); + QueueEntryIterator queueListIterator = getEntries().iterator(); while(queueListIterator.advance()) { @@ -1487,7 +1498,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> //Perform ACLs getVirtualHost().getSecurityManager().authorisePurge(this); - QueueEntryIterator queueListIterator = _entries.iterator(); + QueueEntryIterator queueListIterator = getEntries().iterator(); long count = 0; ServerTransaction txn = new LocalTransaction(getVirtualHost().getMessageStore()); @@ -1605,10 +1616,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> txn.commit(); - if(_alternateExchange != null) - { - _alternateExchange.removeReference(this); - } + preSetAlternateExchange(); for (Action<? super AMQQueue> task : _deleteTaskList) @@ -1834,7 +1842,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> } } - atTail = (node == null) || (_entries.next(node) == null); + atTail = (node == null) || (getEntries().next(node) == null); } return atTail || !subActive; } @@ -1865,7 +1873,8 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> QueueEntry lastSeen = context.getLastSeenEntry(); QueueEntry releasedNode = context.getReleasedEntry(); - QueueEntry node = (releasedNode != null && lastSeen.compareTo(releasedNode)>=0) ? releasedNode : _entries.next(lastSeen); + QueueEntry node = (releasedNode != null && lastSeen.compareTo(releasedNode)>=0) ? releasedNode : getEntries() + .next(lastSeen); boolean expired = false; while (node != null && (!node.isAvailable() || (expired = node.expired()) || !sub.hasInterest(node) || @@ -1887,7 +1896,8 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> lastSeen = context.getLastSeenEntry(); releasedNode = context.getReleasedEntry(); - node = (releasedNode != null && lastSeen.compareTo(releasedNode)>0) ? releasedNode : _entries.next(lastSeen); + node = (releasedNode != null && lastSeen.compareTo(releasedNode)>0) ? releasedNode : getEntries().next( + lastSeen); } return node; } @@ -2059,7 +2069,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> public void checkMessageStatus() { - QueueEntryIterator queueListIterator = _entries.iterator(); + QueueEntryIterator queueListIterator = getEntries().iterator(); while (queueListIterator.advance()) { @@ -2098,22 +2108,11 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> return _alertRepeatGap; } - public void setAlertRepeatGap(long alertRepeatGap) - { - _alertRepeatGap = alertRepeatGap; - } - public long getAlertThresholdMessageAge() { return _alertThresholdMessageAge; } - public void setAlertThresholdMessageAge(long alertThresholdMessageAge) - { - _alertThresholdMessageAge = alertThresholdMessageAge; - updateNotificationCheck(alertThresholdMessageAge, NotificationCheck.MESSAGE_AGE_ALERT); - } - public long getAlertThresholdQueueDepthMessages() { return _alertThresholdQueueDepthMessages; @@ -2139,59 +2138,26 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> } } - public void setAlertThresholdQueueDepthMessages(final long alertThresholdQueueDepthMessages) - { - _alertThresholdQueueDepthMessages = alertThresholdQueueDepthMessages; - updateNotificationCheck(alertThresholdQueueDepthMessages, NotificationCheck.MESSAGE_COUNT_ALERT); - - } - public long getAlertThresholdQueueDepthBytes() { return _alertThresholdQueueDepthBytes; } - // Sets the queue depth, the max queue size - public void setAlertThresholdQueueDepthBytes(final long alertThresholdQueueDepthBytes) - { - _alertThresholdQueueDepthBytes = alertThresholdQueueDepthBytes; - updateNotificationCheck(alertThresholdQueueDepthBytes, NotificationCheck.QUEUE_DEPTH_ALERT); - - } - public long getAlertThresholdMessageSize() { return _alertThresholdMessageSize; } - public void setAlertThresholdMessageSize(final long alertThresholdMessageSize) - { - _alertThresholdMessageSize = alertThresholdMessageSize; - updateNotificationCheck(alertThresholdMessageSize, NotificationCheck.MESSAGE_SIZE_ALERT); - } - public long getQueueFlowControlSizeBytes() { return _queueFlowControlSizeBytes; } - public void setQueueFlowControlSizeBytes(long queueFlowControlSizeBytes) - { - _queueFlowControlSizeBytes = queueFlowControlSizeBytes; - } - public long getQueueFlowResumeSizeBytes() { return _queueFlowResumeSizeBytes; } - public void setQueueFlowResumeSizeBytes(long queueFlowResumeSizeBytes) - { - _queueFlowResumeSizeBytes = queueFlowResumeSizeBytes; - - checkCapacity(); - } - public boolean isOverfull() { return _overfull.get(); @@ -2258,7 +2224,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> public List<Long> getMessagesOnTheQueue(int num, int offset) { ArrayList<Long> ids = new ArrayList<Long>(num); - QueueEntryIterator it = _entries.iterator(); + QueueEntryIterator it = getEntries().iterator(); for (int i = 0; i < offset; i++) { it.advance(); @@ -2657,7 +2623,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> } @Override - public boolean getNoLocal() + public boolean isNoLocal() { return _noLocal; } @@ -2665,15 +2631,26 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> @Override public String getMessageGroupKey() { - return (String) getAttribute(MESSAGE_GROUP_KEY); + return _messageGroupKey; } @Override public boolean isMessageGroupSharedGroups() { - return (Boolean) getAttribute(MESSAGE_GROUP_SHARED_GROUPS); + return _messageGroupSharedGroups; } + @Override + public String getMessageGroupDefaultGroup() + { + return _messageGroupDefaultGroup; + } + + @Override + public int getMaximumDistinctGroups() + { + return _maximumDistinctGroups; + } @Override public boolean isQueueFlowStopped() @@ -2728,14 +2705,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> { try { - if(ALTERNATE_EXCHANGE.equals(name)) - { - // In future we may want to accept a UUID as an alternative way to identifying the exchange - ExchangeImpl alternateExchange = (ExchangeImpl) desired; - setAlternateExchange(alternateExchange); - return true; - } - else if(EXCLUSIVE.equals(name)) + if(EXCLUSIVE.equals(name)) { ExclusivityPolicy existingPolicy = getExclusive(); if(super.changeAttribute(name, expected, desired)) @@ -2776,54 +2746,41 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> _virtualHost.getSecurityManager().authoriseUpdate(this); } - @SuppressWarnings("serial") - static final Map<String, Type> ATTRIBUTE_TYPES = Collections.unmodifiableMap(new HashMap<String, Type>(){{ - put(ALERT_REPEAT_GAP, Long.class); - put(ALERT_THRESHOLD_MESSAGE_AGE, Long.class); - put(ALERT_THRESHOLD_MESSAGE_SIZE, Long.class); - put(ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES, Long.class); - put(ALERT_THRESHOLD_QUEUE_DEPTH_BYTES, Long.class); - put(QUEUE_FLOW_CONTROL_SIZE_BYTES, Long.class); - put(QUEUE_FLOW_RESUME_SIZE_BYTES, Long.class); - put(MAXIMUM_DELIVERY_ATTEMPTS, Integer.class); - put(DESCRIPTION, String.class); - }}); + private static final String[] NON_NEGATIVE_NUMBERS = { + ALERT_REPEAT_GAP, + ALERT_THRESHOLD_MESSAGE_AGE, + ALERT_THRESHOLD_MESSAGE_SIZE, + ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES, + ALERT_THRESHOLD_QUEUE_DEPTH_BYTES, + QUEUE_FLOW_CONTROL_SIZE_BYTES, + QUEUE_FLOW_RESUME_SIZE_BYTES, + MAXIMUM_DELIVERY_ATTEMPTS + }; @Override - protected void changeAttributes(final Map<String, Object> attributes) + protected void validateChange(final ConfiguredObject<?> proxyForValidation, final Set<String> changedAttributes) { - Map<String, Object> convertedAttributes = MapValueConverter.convert(attributes, ATTRIBUTE_TYPES); - validateAttributes(convertedAttributes); - - super.changeAttributes(convertedAttributes); - } - - private void validateAttributes(Map<String, Object> convertedAttributes) - { - Long queueFlowControlSize = (Long) convertedAttributes.get(QUEUE_FLOW_CONTROL_SIZE_BYTES); - Long queueFlowControlResumeSize = (Long) convertedAttributes.get(QUEUE_FLOW_RESUME_SIZE_BYTES); - if (queueFlowControlSize != null || queueFlowControlResumeSize != null ) + super.validateChange(proxyForValidation, changedAttributes); + Queue<?> queue = (Queue) proxyForValidation; + long queueFlowControlSize = queue.getQueueFlowControlSizeBytes(); + long queueFlowControlResumeSize = queue.getQueueFlowResumeSizeBytes(); + if (queueFlowControlResumeSize > queueFlowControlSize) { - if (queueFlowControlSize == null) - { - queueFlowControlSize = (Long)getAttribute(QUEUE_FLOW_CONTROL_SIZE_BYTES); - } - if (queueFlowControlResumeSize == null) - { - queueFlowControlResumeSize = (Long)getAttribute(QUEUE_FLOW_RESUME_SIZE_BYTES); - } - if (queueFlowControlResumeSize > queueFlowControlSize) - { - throw new IllegalConfigurationException("Flow resume size can't be greater than flow control size"); - } + throw new IllegalConfigurationException("Flow resume size can't be greater than flow control size"); } - for (Map.Entry<String, Object> entry: convertedAttributes.entrySet()) + + + for (String attrName : NON_NEGATIVE_NUMBERS) { - Object value = entry.getValue(); - if (value instanceof Number && ((Number)value).longValue() < 0) + if (changedAttributes.contains(attrName)) { - throw new IllegalConfigurationException("Only positive integer value can be specified for the attribute " - + entry.getKey()); + Object value = queue.getAttribute(attrName); + if (!(value instanceof Number) || ((Number) value).longValue() < 0) + { + throw new IllegalConfigurationException( + "Only positive integer value can be specified for the attribute " + + attrName); + } } } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueue.java index c97a789f65..a9c1fec246 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueue.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueue.java @@ -21,6 +21,7 @@ package org.apache.qpid.server.queue; import org.apache.qpid.server.model.ManagedAttribute; +import org.apache.qpid.server.model.ManagedContextDefault; import org.apache.qpid.server.model.ManagedObject; @ManagedObject( category = false, type="lvq" ) @@ -28,6 +29,9 @@ public interface LastValueQueue<X extends LastValueQueue<X>> extends AMQQueue<X> { String LVQ_KEY = "lvqKey"; - @ManagedAttribute + @ManagedContextDefault( name = "queue.lvqKey" ) + String DEFAULT_LVQ_KEY = "qpid.LVQ_key"; + + @ManagedAttribute(automate = true, defaultValue = "${queue.lvqKey}") String getLvqKey(); } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueueFactory.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueueFactory.java new file mode 100644 index 0000000000..2e34d0fc3e --- /dev/null +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueueFactory.java @@ -0,0 +1,48 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.queue; + +import java.util.Map; + +import org.apache.qpid.server.model.AbstractConfiguredObjectTypeFactory; +import org.apache.qpid.server.model.ConfiguredObject; +import org.apache.qpid.server.model.VirtualHost; +import org.apache.qpid.server.virtualhost.VirtualHostImpl; + +public class LastValueQueueFactory extends AbstractConfiguredObjectTypeFactory<LastValueQueueImpl> +{ + public LastValueQueueFactory() + { + super(LastValueQueueImpl.class); + } + + @Override + protected LastValueQueueImpl createInstance(final Map<String, Object> attributes, final ConfiguredObject<?>... parents) + { + VirtualHost<?,?,?> virtualHost = getParent(VirtualHost.class, parents); + if (!(virtualHost instanceof VirtualHostImpl)) + { + throw new IllegalArgumentException("Unexpected virtual host is set as a parent. Expected instance of " + VirtualHostImpl.class.getName()); + } + + return new LastValueQueueImpl((VirtualHostImpl<?,?,?>)virtualHost,attributes); + } +} diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueueImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueueImpl.java index 079f04f92f..d888637d80 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueueImpl.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueueImpl.java @@ -23,52 +23,39 @@ package org.apache.qpid.server.queue; import java.util.Map; -import org.apache.qpid.server.util.MapValueConverter; +import org.apache.qpid.server.model.ManagedAttributeField; import org.apache.qpid.server.virtualhost.VirtualHostImpl; public class LastValueQueueImpl extends AbstractQueue<LastValueQueueImpl> implements LastValueQueue<LastValueQueueImpl> { - public static final String DEFAULT_LVQ_KEY = "qpid.LVQ_key"; + private LastValueQueueList _entries; + + @ManagedAttributeField + private String _lvqKey; public LastValueQueueImpl(VirtualHostImpl virtualHost, Map<String, Object> attributes) { - super(virtualHost, attributes, entryList(attributes)); + super(virtualHost, attributes); } - private static LastValueQueueList.Factory entryList(final Map<String, Object> attributes) - { - - String conflationKey = MapValueConverter.getStringAttribute(LVQ_KEY, - attributes, - DEFAULT_LVQ_KEY); - - // conflation key can still be null if it was present in the map with a null value - return new LastValueQueueList.Factory(conflationKey == null ? DEFAULT_LVQ_KEY : conflationKey); - } - - public String getConflationKey() + @Override + protected void onOpen() { - return ((LastValueQueueList)getEntries()).getConflationKey(); + super.onOpen(); + _entries = new LastValueQueueList(this); } @Override - public Object getAttribute(final String name) + LastValueQueueList getEntries() { - if(LVQ_KEY.equals(name)) - { - if(this instanceof LastValueQueueImpl) - { - return getConflationKey(); - } - } - return super.getAttribute(name); + return _entries; } @Override public String getLvqKey() { - return getConflationKey(); + return _lvqKey; } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueueList.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueueList.java index cd586e9629..6f1edf12e5 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueueList.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueueList.java @@ -54,10 +54,10 @@ public class LastValueQueueList extends OrderedQueueEntryList private final ConflationQueueEntry _deleteInProgress = new ConflationQueueEntry(this); private final ConflationQueueEntry _newerEntryAlreadyBeenAndGone = new ConflationQueueEntry(this); - public LastValueQueueList(LastValueQueueImpl queue, String conflationKey) + public LastValueQueueList(LastValueQueueImpl queue) { super(queue, HEAD_CREATOR); - _conflationKey = conflationKey; + _conflationKey = queue.getLvqKey(); } private ConflationQueueEntry createHead() @@ -65,11 +65,6 @@ public class LastValueQueueList extends OrderedQueueEntryList return new ConflationQueueEntry(this); } - public String getConflationKey() - { - return _conflationKey; - } - @Override protected ConflationQueueEntry createQueueEntry(ServerMessage message) { @@ -254,20 +249,4 @@ public class LastValueQueueList extends OrderedQueueEntryList { return Collections.unmodifiableMap(_latestValuesMap); } - - static class Factory implements QueueEntryListFactory - { - private final String _conflationKey; - - Factory(String conflationKey) - { - _conflationKey = conflationKey; - } - - @Override - public LastValueQueueList createQueueEntryList(final AMQQueue<?> queue) - { - return new LastValueQueueList((LastValueQueueImpl)queue, _conflationKey); - } - } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java index 0797bbd4e9..39317f455d 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java @@ -28,10 +28,9 @@ public abstract class OutOfOrderQueue<X extends OutOfOrderQueue<X>> extends Abst { protected OutOfOrderQueue(VirtualHostImpl virtualHost, - Map<String, Object> attributes, - QueueEntryListFactory entryListFactory) + Map<String, Object> attributes) { - super(virtualHost, attributes, entryListFactory); + super(virtualHost, attributes); } @Override diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueue.java index 8e8732d595..87693b793d 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueue.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueue.java @@ -21,6 +21,7 @@ package org.apache.qpid.server.queue; import org.apache.qpid.server.model.ManagedAttribute; +import org.apache.qpid.server.model.ManagedContextDefault; import org.apache.qpid.server.model.ManagedObject; @ManagedObject( category = false, type="priority" ) @@ -28,6 +29,9 @@ public interface PriorityQueue<X extends PriorityQueue<X>> extends AMQQueue<X> { String PRIORITIES = "priorities"; - @ManagedAttribute + @ManagedContextDefault( name = "queue.priorities") + int DEFAULT_PRIORITY_LEVELS = 10; + + @ManagedAttribute( automate = true, defaultValue = "${queue.priorities}") int getPriorities(); } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueFactory.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueFactory.java new file mode 100644 index 0000000000..f7a46d5bef --- /dev/null +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueFactory.java @@ -0,0 +1,48 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.queue; + +import java.util.Map; + +import org.apache.qpid.server.model.AbstractConfiguredObjectTypeFactory; +import org.apache.qpid.server.model.ConfiguredObject; +import org.apache.qpid.server.model.VirtualHost; +import org.apache.qpid.server.virtualhost.VirtualHostImpl; + +public class PriorityQueueFactory extends AbstractConfiguredObjectTypeFactory<PriorityQueueImpl> +{ + public PriorityQueueFactory() + { + super(PriorityQueueImpl.class); + } + + @Override + protected PriorityQueueImpl createInstance(final Map<String, Object> attributes, final ConfiguredObject<?>... parents) + { + VirtualHost<?,?,?> virtualHost = getParent(VirtualHost.class, parents); + if (!(virtualHost instanceof VirtualHostImpl)) + { + throw new IllegalArgumentException("Unexpected virtual host is set as a parent. Expected instance of " + VirtualHostImpl.class.getName()); + } + + return new PriorityQueueImpl((VirtualHostImpl<?,?,?>)virtualHost,attributes); + } +} diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueImpl.java index 89d542aded..20759797d2 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueImpl.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueImpl.java @@ -22,43 +22,55 @@ package org.apache.qpid.server.queue; import java.util.Map; -import org.apache.qpid.server.util.MapValueConverter; +import org.apache.qpid.server.logging.LogMessage; +import org.apache.qpid.server.logging.messages.QueueMessages; +import org.apache.qpid.server.model.LifetimePolicy; +import org.apache.qpid.server.model.ManagedAttributeField; import org.apache.qpid.server.virtualhost.VirtualHostImpl; public class PriorityQueueImpl extends OutOfOrderQueue<PriorityQueueImpl> implements PriorityQueue<PriorityQueueImpl> { - public static final int DEFAULT_PRIORITY_LEVELS = 10; + private PriorityQueueList _entries; + + @ManagedAttributeField + private int _priorities; public PriorityQueueImpl(VirtualHostImpl virtualHost, - Map<String, Object> attributes) + Map<String, Object> attributes) { - super(virtualHost, attributes, entryList(attributes)); + super(virtualHost, attributes); } - private static PriorityQueueList.Factory entryList(final Map<String, Object> attributes) + @Override + protected void onOpen() { - final Integer priorities = MapValueConverter.getIntegerAttribute(PRIORITIES, attributes, - DEFAULT_PRIORITY_LEVELS); - - return new PriorityQueueList.Factory(priorities); + super.onOpen(); + _entries = PriorityQueueList.newInstance(this); } @Override public int getPriorities() { - return getEntries().getPriorities(); + return _priorities; } @Override - public Object getAttribute(final String name) + PriorityQueueList getEntries() { + return _entries; + } - if(PRIORITIES.equals(name)) - { - return getPriorities(); - } - - return super.getAttribute(name); + protected LogMessage getCreatedLogMessage() + { + String ownerString = getOwner(); + return QueueMessages.CREATED(ownerString, + getPriorities(), + ownerString != null, + getLifetimePolicy() != LifetimePolicy.PERMANENT, + isDurable(), + !isDurable(), + true); } + } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java index f72c8cd57a..076b6c9e73 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java @@ -26,6 +26,11 @@ abstract public class PriorityQueueList extends OrderedQueueEntryList { + public static PriorityQueueList newInstance(PriorityQueueImpl queue) + { + return new PriorityQueueMasterList(queue, queue.getPriorities()); + } + public PriorityQueueList(final PriorityQueueImpl queue, final HeadCreator headCreator) { @@ -185,20 +190,6 @@ abstract public class PriorityQueueList extends OrderedQueueEntryList } } - static class Factory implements QueueEntryListFactory - { - private final int _priorities; - - Factory(int priorities) - { - _priorities = priorities; - } - - public PriorityQueueList createQueueEntryList(AMQQueue<?> queue) - { - return new PriorityQueueMasterList((PriorityQueueImpl) queue, _priorities); - } - } static class PriorityQueueEntrySubList extends PriorityQueueList { diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java index c1687de86e..15df952e61 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java @@ -24,6 +24,7 @@ import java.util.HashMap; import java.util.LinkedHashMap; import java.util.Map; import org.apache.qpid.server.model.Queue; +import org.apache.qpid.server.virtualhost.AbstractVirtualHost; public class QueueArgumentsConverter { @@ -84,7 +85,7 @@ public class QueueArgumentsConverter ATTRIBUTE_MAPPINGS.put(X_QPID_DESCRIPTION, Queue.DESCRIPTION); - ATTRIBUTE_MAPPINGS.put(X_QPID_DLQ_ENABLED, Queue.CREATE_DLQ_ON_CREATION); + ATTRIBUTE_MAPPINGS.put(X_QPID_DLQ_ENABLED, AbstractVirtualHost.CREATE_DLQ_ON_CREATION); ATTRIBUTE_MAPPINGS.put(QPID_GROUP_HEADER_KEY, Queue.MESSAGE_GROUP_KEY); //ATTRIBUTE_MAPPINGS.put(QPID_SHARED_MSG_GROUP, Queue.MESSAGE_GROUP_SHARED_GROUPS); ATTRIBUTE_MAPPINGS.put(QPID_DEFAULT_MESSAGE_GROUP_ARG, Queue.MESSAGE_GROUP_DEFAULT_GROUP); @@ -108,7 +109,7 @@ public class QueueArgumentsConverter } if(wireArguments.containsKey(QPID_LAST_VALUE_QUEUE) && !wireArguments.containsKey(QPID_LAST_VALUE_QUEUE_KEY)) { - modelArguments.put(LastValueQueue.LVQ_KEY, LastValueQueueImpl.DEFAULT_LVQ_KEY); + modelArguments.put(LastValueQueue.LVQ_KEY, LastValueQueue.DEFAULT_LVQ_KEY); } if(wireArguments.containsKey(QPID_SHARED_MSG_GROUP)) { @@ -117,7 +118,7 @@ public class QueueArgumentsConverter } if(wireArguments.get(X_QPID_DLQ_ENABLED) != null) { - modelArguments.put(Queue.CREATE_DLQ_ON_CREATION, Boolean.parseBoolean(wireArguments.get(X_QPID_DLQ_ENABLED).toString())); + modelArguments.put(AbstractVirtualHost.CREATE_DLQ_ON_CREATION, Boolean.parseBoolean(wireArguments.get(X_QPID_DLQ_ENABLED).toString())); } if(wireArguments.get(QPID_NO_LOCAL) != null) diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java index b62e100eea..6fa7801608 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java @@ -20,29 +20,29 @@ */ package org.apache.qpid.server.queue; +import java.util.EnumMap; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import java.util.concurrent.atomic.AtomicLongFieldUpdater; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; + import org.apache.log4j.Logger; import org.apache.qpid.server.consumer.ConsumerImpl; -import org.apache.qpid.server.exchange.ExchangeImpl; import org.apache.qpid.server.filter.Filterable; import org.apache.qpid.server.message.InstanceProperties; import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.model.Exchange; import org.apache.qpid.server.store.TransactionLogResource; import org.apache.qpid.server.txn.LocalTransaction; import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.util.Action; import org.apache.qpid.server.util.StateChangeListener; -import java.util.EnumMap; -import java.util.HashSet; -import java.util.Set; -import java.util.concurrent.CopyOnWriteArraySet; -import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; -import java.util.concurrent.atomic.AtomicLongFieldUpdater; -import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; - public abstract class QueueEntryImpl implements QueueEntry { private static final Logger _log = Logger.getLogger(QueueEntryImpl.class); @@ -365,7 +365,7 @@ public abstract class QueueEntryImpl implements QueueEntry public int routeToAlternate(final Action<? super MessageInstance> action, ServerTransaction txn) { final AMQQueue currentQueue = getQueue(); - ExchangeImpl alternateExchange = currentQueue.getAlternateExchange(); + Exchange<?> alternateExchange = currentQueue.getAlternateExchange(); boolean autocommit = txn == null; int enqueues; diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryListFactory.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryListFactory.java deleted file mode 100644 index 3f701f652e..0000000000 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryListFactory.java +++ /dev/null @@ -1,26 +0,0 @@ -/* -* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -* -*/ -package org.apache.qpid.server.queue; - -interface QueueEntryListFactory -{ - public QueueEntryList createQueueEntryList(AMQQueue<?> queue); -} diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueFactory.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueFactory.java new file mode 100644 index 0000000000..e56ff20c67 --- /dev/null +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueFactory.java @@ -0,0 +1,102 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.queue; + +import java.util.Map; + +import org.apache.qpid.server.model.ConfiguredObject; +import org.apache.qpid.server.model.ConfiguredObjectFactory; +import org.apache.qpid.server.model.ConfiguredObjectFactoryImpl; +import org.apache.qpid.server.model.Model; +import org.apache.qpid.server.model.Port; +import org.apache.qpid.server.model.Queue; +import org.apache.qpid.server.plugin.ConfiguredObjectTypeFactory; +import org.apache.qpid.server.store.ConfiguredObjectRecord; +import org.apache.qpid.server.store.UnresolvedConfiguredObject; + +public class QueueFactory<X extends Queue<X>> implements ConfiguredObjectTypeFactory<X> +{ + private ConfiguredObjectFactory _configuredObjectFactory; + + @Override + public Class<? super X> getCategoryClass() + { + return Queue.class; + } + + @Override + public X create(final Map<String, Object> attributes, final ConfiguredObject<?>... parents) + { + return getQueueFactory(attributes).create(attributes, parents); + } + + @Override + public UnresolvedConfiguredObject<X> recover(final ConfiguredObjectRecord record, + final ConfiguredObject<?>... parents) + { + return getQueueFactory(record.getAttributes()).recover(record, parents); + } + + private ConfiguredObjectTypeFactory<X> getQueueFactory(Map<String, Object> attributes) + { + + String type; + + if(attributes.containsKey(Port.TYPE)) + { + type = (String) attributes.get(Port.TYPE); + } + else + { + if(attributes.containsKey(PriorityQueue.PRIORITIES)) + { + type = "priority"; + } + else if(attributes.containsKey(SortedQueue.SORT_KEY)) + { + type = "sorted"; + } + else if(attributes.containsKey(LastValueQueue.LVQ_KEY)) + { + type = "lvq"; + } + else + { + type = "standard"; + } + } + + synchronized (this) + { + if(_configuredObjectFactory == null) + { + _configuredObjectFactory = new ConfiguredObjectFactoryImpl(Model.getInstance()); + } + } + return _configuredObjectFactory.getConfiguredObjectTypeFactory(Queue.class.getSimpleName(), type); + } + + @Override + public String getType() + { + return null; + } +} diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java index 19f9f6c427..a91e1e8d58 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java @@ -28,7 +28,7 @@ public interface SortedQueue<X extends SortedQueue<X>> extends AMQQueue<X> { String SORT_KEY = "sortKey"; - @ManagedAttribute + @ManagedAttribute( automate = true, mandatory = true ) String getSortKey(); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java index 92abe30442..efb5438214 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java @@ -39,11 +39,11 @@ public class SortedQueueEntryList implements QueueEntryList private final SortedQueueImpl _queue; private final String _propertyName; - public SortedQueueEntryList(final SortedQueueImpl queue, final String propertyName) + public SortedQueueEntryList(final SortedQueueImpl queue) { _queue = queue; _head = new SortedQueueEntry(this); - _propertyName = propertyName; + _propertyName = queue.getSortKey(); } public SortedQueueImpl getQueue() diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryListFactory.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryListFactory.java deleted file mode 100644 index 5cab126374..0000000000 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryListFactory.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.queue; - -public class SortedQueueEntryListFactory implements QueueEntryListFactory -{ - - private final String _propertyName; - - public SortedQueueEntryListFactory(final String propertyName) - { - _propertyName = propertyName; - } - - @Override - public SortedQueueEntryList createQueueEntryList(final AMQQueue<?> queue) - { - return new SortedQueueEntryList((SortedQueueImpl) queue, _propertyName); - } -} diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueFactory.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueFactory.java new file mode 100644 index 0000000000..484e61efa9 --- /dev/null +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueFactory.java @@ -0,0 +1,48 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.queue; + +import java.util.Map; + +import org.apache.qpid.server.model.AbstractConfiguredObjectTypeFactory; +import org.apache.qpid.server.model.ConfiguredObject; +import org.apache.qpid.server.model.VirtualHost; +import org.apache.qpid.server.virtualhost.VirtualHostImpl; + +public class SortedQueueFactory extends AbstractConfiguredObjectTypeFactory<SortedQueueImpl> +{ + public SortedQueueFactory() + { + super(SortedQueueImpl.class); + } + + @Override + protected SortedQueueImpl createInstance(final Map<String, Object> attributes, final ConfiguredObject<?>... parents) + { + VirtualHost<?,?,?> virtualHost = getParent(VirtualHost.class, parents); + if (!(virtualHost instanceof VirtualHostImpl)) + { + throw new IllegalArgumentException("Unexpected virtual host is set as a parent. Expected instance of " + VirtualHostImpl.class.getName()); + } + + return new SortedQueueImpl((VirtualHostImpl<?,?,?>)virtualHost,attributes); + } +} diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueImpl.java index c495b09d4a..d0bb79900d 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueImpl.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueImpl.java @@ -23,8 +23,8 @@ import java.util.Map; import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.model.ManagedAttributeField; import org.apache.qpid.server.util.Action; -import org.apache.qpid.server.util.MapValueConverter; import org.apache.qpid.server.virtualhost.VirtualHostImpl; public class SortedQueueImpl extends OutOfOrderQueue<SortedQueueImpl> implements SortedQueue<SortedQueueImpl> @@ -33,30 +33,22 @@ public class SortedQueueImpl extends OutOfOrderQueue<SortedQueueImpl> implements //monitor to prevent lock order issues with consumer sendLocks //and consumer updates in the super classes private final Object _sortedQueueLock = new Object(); - private final String _sortedPropertyName; - - protected SortedQueueImpl(VirtualHostImpl virtualHost, - Map<String, Object> attributes, - QueueEntryListFactory factory) - { - super(virtualHost, attributes, factory); - _sortedPropertyName = MapValueConverter.getStringAttribute(SORT_KEY,attributes); - } + @ManagedAttributeField + private String _sortKey; + private SortedQueueEntryList _entries; public SortedQueueImpl(VirtualHostImpl virtualHost, - Map<String, Object> attributes) + Map<String, Object> attributes) { - this(virtualHost, - attributes, - new SortedQueueEntryListFactory(MapValueConverter.getStringAttribute(SORT_KEY, attributes))); + super(virtualHost, attributes); } - - - public String getSortedPropertyName() + @Override + protected void onOpen() { - return _sortedPropertyName; + super.onOpen(); + _entries = new SortedQueueEntryList(this); } @Override @@ -70,20 +62,14 @@ public class SortedQueueImpl extends OutOfOrderQueue<SortedQueueImpl> implements } @Override - public Object getAttribute(final String name) + SortedQueueEntryList getEntries() { - - if(SORT_KEY.equals(name)) - { - return getSortedPropertyName(); - } - - return super.getAttribute(name); + return _entries; } @Override public String getSortKey() { - return getSortedPropertyName(); + return _sortKey; } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueueEntryList.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueueEntryList.java index 341e2365c3..487bac1a43 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueueEntryList.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueueEntryList.java @@ -45,13 +45,4 @@ public class StandardQueueEntryList extends OrderedQueueEntryList return new StandardQueueEntry(this, message); } - static class Factory implements QueueEntryListFactory - { - - public StandardQueueEntryList createQueueEntryList(AMQQueue<?> queue) - { - return new StandardQueueEntryList((StandardQueueImpl) queue); - } - } - } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueueFactory.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueueFactory.java new file mode 100644 index 0000000000..1e2d8f63a6 --- /dev/null +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueueFactory.java @@ -0,0 +1,48 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.queue; + +import java.util.Map; + +import org.apache.qpid.server.model.AbstractConfiguredObjectTypeFactory; +import org.apache.qpid.server.model.ConfiguredObject; +import org.apache.qpid.server.model.VirtualHost; +import org.apache.qpid.server.virtualhost.VirtualHostImpl; + +public class StandardQueueFactory extends AbstractConfiguredObjectTypeFactory<StandardQueueImpl> +{ + public StandardQueueFactory() + { + super(StandardQueueImpl.class); + } + + @Override + protected StandardQueueImpl createInstance(final Map<String, Object> attributes, final ConfiguredObject<?>... parents) + { + VirtualHost<?,?,?> virtualHost = getParent(VirtualHost.class, parents); + if (!(virtualHost instanceof VirtualHostImpl)) + { + throw new IllegalArgumentException("Unexpected virtual host is set as a parent. Expected instance of " + VirtualHostImpl.class.getName()); + } + + return new StandardQueueImpl((VirtualHostImpl<?,?,?>)virtualHost,attributes); + } +} diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueueImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueueImpl.java index 4dc57b14a7..19c808a2f2 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueueImpl.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueueImpl.java @@ -26,9 +26,24 @@ import org.apache.qpid.server.virtualhost.VirtualHostImpl; public class StandardQueueImpl extends AbstractQueue<StandardQueueImpl> implements StandardQueue<StandardQueueImpl> { + private StandardQueueEntryList _entries; + public StandardQueueImpl(final VirtualHostImpl virtualHost, final Map<String, Object> arguments) { - super(virtualHost, arguments, new StandardQueueEntryList.Factory()); + super(virtualHost, arguments); + } + + @Override + protected void onOpen() + { + super.onOpen(); + _entries = new StandardQueueEntryList(this); + } + + @Override + StandardQueueEntryList getEntries() + { + return _entries; } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java index cb3729e4e3..a532e2a749 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java @@ -31,6 +31,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import javax.security.auth.login.AccountNotFoundException; @@ -48,7 +49,6 @@ import org.apache.qpid.server.model.IllegalStateTransitionException; import org.apache.qpid.server.model.ManagedAttributeField; import org.apache.qpid.server.model.PreferencesProvider; import org.apache.qpid.server.model.State; -import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.model.User; import org.apache.qpid.server.security.access.Operation; import org.apache.qpid.server.security.auth.AuthenticationResult; @@ -531,7 +531,7 @@ public abstract class PrincipalDatabaseAuthenticationManager<T extends Principal private static Map<String, Object> createPrincipalAttributes(PrincipalDatabaseAuthenticationManager manager, final Principal user) { final Map<String, Object> attributes = new HashMap<String, Object>(); - attributes.put(ID, UUIDGenerator.generateUserUUID(manager.getName(), user.getName())); + attributes.put(ID, UUID.randomUUID()); attributes.put(NAME, user.getName()); return attributes; } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractDurableConfiguredObjectRecoverer.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractDurableConfiguredObjectRecoverer.java index 154b548cab..75fe2a6642 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractDurableConfiguredObjectRecoverer.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractDurableConfiguredObjectRecoverer.java @@ -22,44 +22,84 @@ package org.apache.qpid.server.store; import java.util.UUID; -public abstract class AbstractDurableConfiguredObjectRecoverer<T> implements DurableConfiguredObjectRecoverer +import org.apache.qpid.server.model.ConfiguredObject; + +public abstract class AbstractDurableConfiguredObjectRecoverer<T extends ConfiguredObject> implements DurableConfiguredObjectRecoverer { @Override public void load(final DurableConfigurationRecoverer durableConfigurationRecoverer, final ConfiguredObjectRecord record) { - final UnresolvedObject obj = createUnresolvedObject(record); + final UnresolvedObject<T> obj = createUnresolvedObject(record); UnresolvedDependency[] dependencies = obj.getUnresolvedDependencies(); for(final UnresolvedDependency dependency : dependencies) { - Object dep; - if((dep = durableConfigurationRecoverer.getResolvedObject(dependency.getType(), dependency.getId())) != null) + if(dependency.getId() != null) { - dependency.resolve(dep); + Object dep; + if ((dep = durableConfigurationRecoverer.getResolvedObject(dependency.getType(), dependency.getId())) + != null) + { + dependency.resolve(dep); + } + else + { + durableConfigurationRecoverer.addResolutionListener(dependency.getType(), dependency.getId(), + null, new DependencyListener() + { + + @Override + public void dependencyResolved(final String depType, + final UUID depId, + final ConfiguredObject o) + { + dependency.resolve(o); + if (obj.getUnresolvedDependencies().length + == 0) + { + durableConfigurationRecoverer.resolve( + getType(), + record.getId(), + obj.resolve()); + } + } + } + ); + } } else { - durableConfigurationRecoverer.addResolutionListener(dependency.getType(), dependency.getId(), - new DependencyListener() - { + Object dep; + + if ((dep = durableConfigurationRecoverer.getResolvedObject(dependency.getType(), dependency.getName())) + != null) + { + dependency.resolve(dep); + } + else + { + durableConfigurationRecoverer.addResolutionListener(dependency.getType(), dependency.getId(), + dependency.getName(), new DependencyListener() + { - @Override - public void dependencyResolved(final String depType, - final UUID depId, - final Object o) - { - dependency.resolve(o); - if (obj.getUnresolvedDependencies().length - == 0) - { - durableConfigurationRecoverer.resolve( - getType(), - record.getId(), - obj.resolve()); - } - } - } - ); + @Override + public void dependencyResolved(final String depType, + final UUID depId, + final ConfiguredObject o) + { + dependency.resolve(o); + if (obj.getUnresolvedDependencies().length + == 0) + { + durableConfigurationRecoverer.resolve( + getType(), + record.getId(), + obj.resolve()); + } + } + } + ); + } } } if(obj.getUnresolvedDependencies().length == 0) diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DependencyListener.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DependencyListener.java index 120c904cf7..2962e63152 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DependencyListener.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DependencyListener.java @@ -22,7 +22,9 @@ package org.apache.qpid.server.store; import java.util.UUID; +import org.apache.qpid.server.model.ConfiguredObject; + interface DependencyListener { - void dependencyResolved(String type, UUID id, Object o); + void dependencyResolved(String type, UUID id, ConfiguredObject o); } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationRecoverer.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationRecoverer.java index f8a8741a37..507e14f021 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationRecoverer.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationRecoverer.java @@ -29,17 +29,19 @@ import java.util.Map; import java.util.UUID; import org.apache.log4j.Logger; + import org.apache.qpid.server.configuration.IllegalConfigurationException; import org.apache.qpid.server.logging.EventLogger; import org.apache.qpid.server.logging.messages.ConfigStoreMessages; import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject; +import org.apache.qpid.server.model.ConfiguredObject; import org.apache.qpid.server.model.Model; public class DurableConfigurationRecoverer implements ConfigurationRecoveryHandler { private static final Logger _logger = Logger.getLogger(DurableConfigurationRecoverer.class); - private final Map<String, Map<UUID, Object>> _resolvedObjects = new HashMap<String, Map<UUID, Object>>(); + private final Map<String, Map<UUID, ConfiguredObject>> _resolvedObjects = new HashMap<String, Map<UUID, ConfiguredObject>>(); private final Map<String, Map<UUID, UnresolvedObject>> _unresolvedObjects = new HashMap<String, Map<UUID, UnresolvedObject>>(); @@ -47,6 +49,9 @@ public class DurableConfigurationRecoverer implements ConfigurationRecoveryHandl private final Map<String, Map<UUID, List<DependencyListener>>> _dependencyListeners = new HashMap<String, Map<UUID, List<DependencyListener>>>(); + private final Map<String, Map<String, List<DependencyListener>>> _dependencyNameListeners = + new HashMap<String, Map<String, List<DependencyListener>>>(); + private final Map<String, DurableConfiguredObjectRecoverer> _recoverers; private final UpgraderProvider _upgraderProvider; private final EventLogger _eventLogger; @@ -188,36 +193,72 @@ public class DurableConfigurationRecoverer implements ConfigurationRecoveryHandl void addResolutionListener(final String type, final UUID id, + final String name, final DependencyListener dependencyListener) { - Map<UUID, List<DependencyListener>> typeListeners = _dependencyListeners.get(type); - if(typeListeners == null) + if(id != null) { - typeListeners = new HashMap<UUID, List<DependencyListener>>(); - _dependencyListeners.put(type, typeListeners); + Map<UUID, List<DependencyListener>> typeListeners = _dependencyListeners.get(type); + if (typeListeners == null) + { + typeListeners = new HashMap<UUID, List<DependencyListener>>(); + _dependencyListeners.put(type, typeListeners); + } + List<DependencyListener> objectListeners = typeListeners.get(id); + if (objectListeners == null) + { + objectListeners = new ArrayList<DependencyListener>(); + typeListeners.put(id, objectListeners); + } + objectListeners.add(dependencyListener); } - List<DependencyListener> objectListeners = typeListeners.get(id); - if(objectListeners == null) + else { - objectListeners = new ArrayList<DependencyListener>(); - typeListeners.put(id, objectListeners); + Map<String, List<DependencyListener>> typeListeners = _dependencyNameListeners.get(type); + if (typeListeners == null) + { + typeListeners = new HashMap<String, List<DependencyListener>>(); + _dependencyNameListeners.put(type, typeListeners); + } + List<DependencyListener> objectListeners = typeListeners.get(name); + if (objectListeners == null) + { + objectListeners = new ArrayList<DependencyListener>(); + typeListeners.put(name, objectListeners); + } + objectListeners.add(dependencyListener); } - objectListeners.add(dependencyListener); - } Object getResolvedObject(final String type, final UUID id) { - Map<UUID, Object> objects = _resolvedObjects.get(type); + Map<UUID, ConfiguredObject> objects = _resolvedObjects.get(type); return objects == null ? null : objects.get(id); } - void resolve(final String type, final UUID id, final Object object) + Object getResolvedObject(final String type, final String name) + { + Map<UUID, ConfiguredObject> objects = _resolvedObjects.get(type); + if(objects != null) + { + for (ConfiguredObject object : objects.values()) + { + if(object.getName().equals(name)) + { + return object; + } + } + } + return null; + + } + + void resolve(final String type, final UUID id, final ConfiguredObject object) { - Map<UUID, Object> typeObjects = _resolvedObjects.get(type); + Map<UUID, ConfiguredObject> typeObjects = _resolvedObjects.get(type); if(typeObjects == null) { - typeObjects = new HashMap<UUID, Object>(); + typeObjects = new HashMap<UUID, ConfiguredObject>(); _resolvedObjects.put(type, typeObjects); } typeObjects.put(id, object); @@ -239,6 +280,19 @@ public class DurableConfigurationRecoverer implements ConfigurationRecoveryHandl } } } + + Map<String, List<DependencyListener>> typeNameListeners = _dependencyNameListeners.get(type); + if(typeNameListeners != null) + { + List<DependencyListener> listeners = typeNameListeners.remove(object.getName()); + if(listeners != null) + { + for(DependencyListener listener : listeners) + { + listener.dependencyResolved(type, id, object); + } + } + } } void addUnresolvedObject(final String type, diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStoreHelper.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStoreHelper.java index 9410006d65..e7f380660c 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStoreHelper.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStoreHelper.java @@ -22,14 +22,11 @@ package org.apache.qpid.server.store; import java.util.Arrays; import java.util.HashSet; -import java.util.Map; - import java.util.Set; import org.apache.qpid.server.binding.BindingImpl; import org.apache.qpid.server.exchange.ExchangeImpl; import org.apache.qpid.server.model.Binding; -import org.apache.qpid.server.model.ConfiguredObject; import org.apache.qpid.server.model.Exchange; import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.queue.AMQQueue; @@ -44,26 +41,12 @@ public class DurableConfigurationStoreHelper public static void updateQueue(DurableConfigurationStore store, AMQQueue queue) { - Map<String, Object> attributesMap = queue.getActualAttributes(); - attributesMap.remove(ConfiguredObject.ID); - if(queue.getAlternateExchange() != null) - { - attributesMap.put(Queue.ALTERNATE_EXCHANGE, queue.getAlternateExchange().getId()); - } - store.update(false, new ConfiguredObjectRecordImpl(queue.getId(), QUEUE, attributesMap)); + store.update(false, queue.asObjectRecord()); } public static void createQueue(DurableConfigurationStore store, AMQQueue<?> queue) { - - Map<String, Object> attributesMap = queue.getActualAttributes(); - attributesMap.remove(ConfiguredObject.ID); - if(queue.getAlternateExchange() != null) - { - attributesMap.put(Queue.ALTERNATE_EXCHANGE, queue.getAlternateExchange().getId()); - } - - store.create(new ConfiguredObjectRecordImpl(queue.getId(), QUEUE, attributesMap)); + store.create(queue.asObjectRecord()); } public static void removeQueue(DurableConfigurationStore store, AMQQueue queue) diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/UnresolvedDependency.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/UnresolvedDependency.java index 98348efbd2..29991054fd 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/UnresolvedDependency.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/UnresolvedDependency.java @@ -25,6 +25,7 @@ import java.util.UUID; public interface UnresolvedDependency<T> { public UUID getId(); + public String getName(); public String getType(); public void resolve(final T dependency); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java index 0484841aa9..b770fab698 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java @@ -70,13 +70,6 @@ import org.apache.qpid.server.protocol.AMQConnectionModel; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.protocol.LinkRegistry; import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.queue.LastValueQueue; -import org.apache.qpid.server.queue.LastValueQueueImpl; -import org.apache.qpid.server.queue.PriorityQueue; -import org.apache.qpid.server.queue.PriorityQueueImpl; -import org.apache.qpid.server.queue.SortedQueue; -import org.apache.qpid.server.queue.SortedQueueImpl; -import org.apache.qpid.server.queue.StandardQueueImpl; import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.security.access.Operation; import org.apache.qpid.server.stats.StatisticsCounter; @@ -101,6 +94,7 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte { public static final String DEFAULT_DLQ_NAME_SUFFIX = "_DLQ"; public static final String DLQ_ROUTING_KEY = "dlq"; + public static final String CREATE_DLQ_ON_CREATION = "x-qpid-dlq-enabled"; // TODO - this value should change private static final int MAX_LENGTH = 255; private static final Logger _logger = Logger.getLogger(AbstractVirtualHost.class); @@ -195,7 +189,7 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte if(attributes.get(ID) == null) { attributes = new HashMap<String, Object>(attributes); - attributes.put(ID, UUIDGenerator.generateVhostUUID((String)attributes.get(NAME))); + attributes.put(ID, UUID.randomUUID()); } return attributes; } @@ -608,6 +602,7 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte public AMQQueue<?> createQueue(Map<String, Object> attributes) throws QueueExistsException { checkVHostStateIsActive(); + AMQQueue<?> queue = addQueue(attributes); childAdded(queue); return queue; @@ -615,51 +610,34 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte private AMQQueue<?> addQueue(Map<String, Object> attributes) throws QueueExistsException { - - // make a copy as we may augment (with an ID for example) - attributes = new LinkedHashMap<String, Object>(attributes); - if (attributes.containsKey(Queue.TYPE)) + if (shouldCreateDLQ(attributes)) { - String typeAttribute = MapValueConverter.getStringAttribute(Queue.TYPE, attributes, null); - QueueType queueType = null; - try - { - queueType = QueueType.valueOf(typeAttribute.toUpperCase()); - } - catch(Exception e) - { - throw new IllegalArgumentException("Unsupported queue type :" + typeAttribute); - } - if (queueType == QueueType.LVQ && attributes.get(LastValueQueue.LVQ_KEY) == null) - { - attributes.put(LastValueQueue.LVQ_KEY, LastValueQueueImpl.DEFAULT_LVQ_KEY); - } - else if (queueType == QueueType.PRIORITY && attributes.get(PriorityQueue.PRIORITIES) == null) - { - attributes.put(PriorityQueue.PRIORITIES, 10); - } - else if (queueType == QueueType.SORTED && attributes.get(SortedQueue.SORT_KEY) == null) - { - throw new IllegalArgumentException("Sort key is not specified for sorted queue"); - } + // TODO - this isn't really correct - what if the name has ${foo} in it? + String queueName = String.valueOf(attributes.get(Queue.NAME)); + validateDLNames(queueName); + String altExchangeName = createDLQ(queueName); + attributes = new LinkedHashMap<String, Object>(attributes); + attributes.put(Queue.ALTERNATE_EXCHANGE, altExchangeName); } + return addQueueWithoutDLQ(attributes); + } + + private AMQQueue<?> addQueueWithoutDLQ(Map<String, Object> attributes) throws QueueExistsException + { + Broker<?> broker = getParent(Broker.class); - if(!attributes.containsKey(Queue.ID)) + ConfiguredObjectTypeFactory<? extends Queue> factory = + broker.getObjectFactory().getConfiguredObjectTypeFactory(Queue.class, attributes); + + try { - UUID id = UUID.randomUUID(); - attributes.put(Queue.ID, id); + return (AMQQueue) factory.create(attributes, this); } - - boolean createDLQ = shouldCreateDLQ(attributes, getDefaultDeadLetterQueueEnabled()); - if (createDLQ) + catch (DuplicateNameException e) { - // TODO - this isn't really correct - what if the name has ${foo} in it? - validateDLNames(String.valueOf(attributes.get(Queue.NAME))); + throw new QueueExistsException(getQueue(e.getName())); } - return createOrRestoreQueue(attributes, true); - - } @@ -1525,94 +1503,13 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte } - // TODO - remove - public AMQQueue restoreQueue(Map<String, Object> attributes) - { - return createOrRestoreQueue(attributes, false); - - } - - - private AMQQueue createOrRestoreQueue(Map<String, Object> attributes, boolean createInStore) - { - String queueName = MapValueConverter.getStringAttribute(Queue.NAME,attributes); - boolean createDLQ = createInStore && shouldCreateDLQ(attributes, getDefaultDeadLetterQueueEnabled()); - if (createDLQ) - { - validateDLNames(queueName); - } - - AMQQueue queue; - - try - { - - - if (attributes.containsKey(SortedQueue.SORT_KEY)) - { - queue = new SortedQueueImpl(this, attributes); - } - else if (attributes.containsKey(LastValueQueue.LVQ_KEY)) - { - queue = new LastValueQueueImpl(this, attributes); - } - else if (attributes.containsKey(PriorityQueue.PRIORITIES)) - { - queue = new PriorityQueueImpl(this, attributes); - } - else - { - queue = new StandardQueueImpl(this, attributes); - } - queue.open(); - } - catch(DuplicateNameException e) - { - - throw new QueueExistsException(e.getName(), getQueue(e.getName())); - } - - if(createDLQ) - { - createDLQ(queue); - } - else if(attributes != null && attributes.get(Queue.ALTERNATE_EXCHANGE) instanceof String) - { - - final String altExchangeAttr = (String) attributes.get(Queue.ALTERNATE_EXCHANGE); - ExchangeImpl altExchange; - try - { - altExchange = getExchange(UUID.fromString(altExchangeAttr)); - } - catch(IllegalArgumentException e) - { - altExchange = getExchange(altExchangeAttr); - } - queue.setAlternateExchange(altExchange); - } - - if (createInStore && queue.isDurable() && !(queue.getLifetimePolicy() - == LifetimePolicy.DELETE_ON_CONNECTION_CLOSE - || queue.getLifetimePolicy() - == LifetimePolicy.DELETE_ON_SESSION_END)) - { - DurableConfigurationStoreHelper.createQueue(getDurableConfigurationStore(), queue); - } - - return queue; - } - - - - private void createDLQ(final AMQQueue queue) + private String createDLQ(final String queueName) { - final String queueName = queue.getName(); final String dlExchangeName = getDeadLetterExchangeName(queueName); final String dlQueueName = getDeadLetterQueueName(queueName); ExchangeImpl dlExchange = null; - final UUID dlExchangeId = UUIDGenerator.generateExchangeUUID(dlExchangeName, getName()); + final UUID dlExchangeId = UUID.randomUUID(); try { @@ -1654,7 +1551,7 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte { //set args to disable DLQ-ing/MDC from the DLQ itself, preventing loops etc final Map<String, Object> args = new HashMap<String, Object>(); - args.put(Queue.CREATE_DLQ_ON_CREATION, false); + args.put(CREATE_DLQ_ON_CREATION, false); args.put(Queue.MAXIMUM_DELIVERY_ATTEMPTS, 0); try @@ -1664,7 +1561,8 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte args.put(Queue.ID, UUID.randomUUID()); args.put(Queue.NAME, dlQueueName); args.put(Queue.DURABLE, true); - dlQueue = createQueue(args); + dlQueue = addQueueWithoutDLQ(args); + childAdded(dlQueue); } catch (QueueExistsException e) { @@ -1681,7 +1579,7 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte //but we will make the key 'dlq' as it can be logged at creation. dlExchange.addBinding(AbstractVirtualHost.DLQ_ROUTING_KEY, dlQueue, null); } - queue.setAlternateExchange(dlExchange); + return dlExchangeName; } private static void validateDLNames(String name) @@ -1701,8 +1599,9 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte } } - private static boolean shouldCreateDLQ(Map<String, Object> arguments, boolean virtualHostDefaultDeadLetterQueueEnabled) + private boolean shouldCreateDLQ(Map<String, Object> arguments) { + boolean autoDelete = MapValueConverter.getEnumAttribute(LifetimePolicy.class, Queue.LIFETIME_POLICY, arguments, @@ -1712,19 +1611,19 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte if (!(autoDelete || (arguments != null && arguments.containsKey(Queue.ALTERNATE_EXCHANGE)))) { boolean dlqArgumentPresent = arguments != null - && arguments.containsKey(Queue.CREATE_DLQ_ON_CREATION); + && arguments.containsKey(CREATE_DLQ_ON_CREATION); if (dlqArgumentPresent) { boolean dlqEnabled = true; if (dlqArgumentPresent) { - Object argument = arguments.get(Queue.CREATE_DLQ_ON_CREATION); + Object argument = arguments.get(CREATE_DLQ_ON_CREATION); dlqEnabled = (argument instanceof Boolean && ((Boolean)argument).booleanValue()) || (argument instanceof String && Boolean.parseBoolean(argument.toString())); } return dlqEnabled; } - return virtualHostDefaultDeadLetterQueueEnabled; + return isQueue_deadLetterQueueEnabled(); } return false; } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/BindingRecoverer.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/BindingRecoverer.java index a976db05f6..6e399d950e 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/BindingRecoverer.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/BindingRecoverer.java @@ -123,6 +123,12 @@ public class BindingRecoverer extends AbstractDurableConfiguredObjectRecoverer<B } @Override + public String getName() + { + return null; + } + + @Override public String getType() { return Queue.class.getSimpleName(); @@ -147,6 +153,12 @@ public class BindingRecoverer extends AbstractDurableConfiguredObjectRecoverer<B } @Override + public String getName() + { + return null; + } + + @Override public String getType() { return org.apache.qpid.server.model.Exchange.class.getSimpleName(); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/ExchangeRecoverer.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/ExchangeRecoverer.java index 4bf7635513..9347b97606 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/ExchangeRecoverer.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/ExchangeRecoverer.java @@ -28,7 +28,6 @@ import org.apache.qpid.server.exchange.ExchangeImpl; import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.model.ConfiguredObjectFactory; import org.apache.qpid.server.model.Exchange; -import org.apache.qpid.server.model.SystemContext; import org.apache.qpid.server.plugin.ConfiguredObjectTypeFactory; import org.apache.qpid.server.store.AbstractDurableConfiguredObjectRecoverer; import org.apache.qpid.server.store.ConfiguredObjectRecord; @@ -46,8 +45,7 @@ public class ExchangeRecoverer extends AbstractDurableConfiguredObjectRecoverer< { _vhost = vhost; Broker<?> broker = _vhost.getParent(Broker.class); - SystemContext<?> systemContext = broker.getParent(SystemContext.class); - _objectFactory = systemContext.getObjectFactory(); + _objectFactory = broker.getObjectFactory(); } @Override diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueExistsException.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueExistsException.java index 5bec8c1457..ef9046e54e 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueExistsException.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueExistsException.java @@ -26,6 +26,11 @@ public class QueueExistsException extends RuntimeException { private final AMQQueue _existing; + public QueueExistsException(AMQQueue existing) + { + this(existing.getName(), existing); + } + public QueueExistsException(String name, AMQQueue existing) { super(name); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueRecoverer.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueRecoverer.java index 1841f1e45f..9606a44acb 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueRecoverer.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueRecoverer.java @@ -21,7 +21,7 @@ package org.apache.qpid.server.virtualhost; import java.util.ArrayList; -import java.util.LinkedHashMap; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.UUID; @@ -29,21 +29,28 @@ import java.util.UUID; import org.apache.log4j.Logger; import org.apache.qpid.server.exchange.ExchangeImpl; +import org.apache.qpid.server.model.Broker; +import org.apache.qpid.server.model.ConfiguredObjectFactory; import org.apache.qpid.server.model.Queue; +import org.apache.qpid.server.plugin.ConfiguredObjectTypeFactory; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.store.AbstractDurableConfiguredObjectRecoverer; import org.apache.qpid.server.store.ConfiguredObjectRecord; +import org.apache.qpid.server.store.UnresolvedConfiguredObject; import org.apache.qpid.server.store.UnresolvedDependency; import org.apache.qpid.server.store.UnresolvedObject; public class QueueRecoverer extends AbstractDurableConfiguredObjectRecoverer<AMQQueue> { private static final Logger _logger = Logger.getLogger(QueueRecoverer.class); - private final VirtualHostImpl _virtualHost; + private final VirtualHostImpl<?,?,?> _virtualHost; + private final ConfiguredObjectFactory _objectFactory; public QueueRecoverer(final VirtualHostImpl virtualHost) { _virtualHost = virtualHost; + Broker<?> broker = _virtualHost.getParent(Broker.class); + _objectFactory = broker.getObjectFactory(); } @Override @@ -55,33 +62,42 @@ public class QueueRecoverer extends AbstractDurableConfiguredObjectRecoverer<AMQ @Override public UnresolvedObject<AMQQueue> createUnresolvedObject(final ConfiguredObjectRecord record) { - return new UnresolvedQueue(record.getId(), record.getAttributes()); + return new UnresolvedQueue(record); } private class UnresolvedQueue implements UnresolvedObject<AMQQueue> { - private final Map<String, Object> _attributes; - private final UUID _alternateExchangeId; - private final UUID _id; + + // private final UUID _alternateExchangeId; + private final ConfiguredObjectRecord _record; private AMQQueue _queue; private List<UnresolvedDependency> _dependencies = new ArrayList<UnresolvedDependency>(); private ExchangeImpl _alternateExchange; + private UUID _alternateExchangeId; + private String _alternateExchangeName; - public UnresolvedQueue(final UUID id, - final Map<String, Object> attributes) + public UnresolvedQueue(ConfiguredObjectRecord record) { - _attributes = attributes; - _alternateExchangeId = _attributes.get(Queue.ALTERNATE_EXCHANGE) == null ? null : UUID.fromString((String) _attributes - .get(Queue.ALTERNATE_EXCHANGE)); - _id = id; - if (_alternateExchangeId != null) + _record = record; + Object altExchObj = record.getAttributes().get(Queue.ALTERNATE_EXCHANGE); + if(altExchObj instanceof UUID) + { + _alternateExchangeId = (UUID) altExchObj; + _dependencies.add(new AlternateExchangeDependency()); + } + else if (altExchObj instanceof String) { - _alternateExchange = _virtualHost.getExchange(_alternateExchangeId); - if(_alternateExchange == null) + try + { + _alternateExchangeId = UUID.fromString((String)altExchObj); + } + catch (IllegalArgumentException e) { - _dependencies.add(new AlternateExchangeDependency()); + _alternateExchangeName = (String) altExchObj; } + _dependencies.add(new AlternateExchangeDependency()); } + } @Override @@ -93,9 +109,9 @@ public class QueueRecoverer extends AbstractDurableConfiguredObjectRecoverer<AMQ @Override public AMQQueue resolve() { - String queueName = (String) _attributes.get(Queue.NAME); + String queueName = (String) _record.getAttributes().get(Queue.NAME); - _queue = _virtualHost.getQueue(_id); + _queue = _virtualHost.getQueue(_record.getId()); if(_queue == null) { _queue = _virtualHost.getQueue(queueName); @@ -103,11 +119,17 @@ public class QueueRecoverer extends AbstractDurableConfiguredObjectRecoverer<AMQ if (_queue == null) { - Map<String, Object> attributes = new LinkedHashMap<String, Object>(_attributes); - attributes.put(Queue.ID, _id); - attributes.put(Queue.DURABLE, true); - _queue = _virtualHost.restoreQueue(attributes); + Map<String,Object> attributesWithId = new HashMap<String,Object>(_record.getAttributes()); + attributesWithId.put(Queue.ID,_record.getId()); + attributesWithId.put(Queue.DURABLE,true); + + ConfiguredObjectTypeFactory<? extends Queue> configuredObjectTypeFactory = + _objectFactory.getConfiguredObjectTypeFactory(Queue.class, attributesWithId); + UnresolvedConfiguredObject<? extends Queue> unresolvedConfiguredObject = + configuredObjectTypeFactory.recover(_record, _virtualHost); + _queue = (AMQQueue<?>) unresolvedConfiguredObject.resolve(); } + _queue.open(); return _queue; } @@ -120,6 +142,12 @@ public class QueueRecoverer extends AbstractDurableConfiguredObjectRecoverer<AMQ } @Override + public String getName() + { + return _alternateExchangeName; + } + + @Override public String getType() { return "Exchange"; diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java index afb65fa326..d0737e8311 100755 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java @@ -128,6 +128,4 @@ public interface VirtualHostImpl< X extends VirtualHostImpl<X,Q,E>, Q extends AM EventLogger getEventLogger(); - // TODO - remove - public AMQQueue restoreQueue(Map<String, Object> attributes); } diff --git a/qpid/java/broker-core/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.ConfiguredObjectTypeFactory b/qpid/java/broker-core/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.ConfiguredObjectTypeFactory index a5dfec8a29..cc000d49e6 100644 --- a/qpid/java/broker-core/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.ConfiguredObjectTypeFactory +++ b/qpid/java/broker-core/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.ConfiguredObjectTypeFactory @@ -35,6 +35,11 @@ org.apache.qpid.server.model.adapter.BrokerAdapterFactory org.apache.qpid.server.model.adapter.StandardVirtualHostFactory org.apache.qpid.server.model.adapter.FileBasedGroupProviderFactory org.apache.qpid.server.model.adapter.FileSystemPreferencesProviderFactory +org.apache.qpid.server.queue.LastValueQueueFactory +org.apache.qpid.server.queue.PriorityQueueFactory +org.apache.qpid.server.queue.QueueFactory +org.apache.qpid.server.queue.SortedQueueFactory +org.apache.qpid.server.queue.StandardQueueFactory org.apache.qpid.server.exchange.DirectExchangeFactory org.apache.qpid.server.exchange.FanoutExchangeFactory org.apache.qpid.server.exchange.HeadersExchangeFactory diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java index c62b541191..5fd8de9ac9 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java @@ -709,8 +709,7 @@ abstract class AbstractQueueTestBase extends QpidTestCase private AbstractQueue createNonAsyncDeliverQueue() { - TestSimpleQueueEntryListFactory factory = new TestSimpleQueueEntryListFactory(); - return new NonAsyncDeliverQueue(factory, getVirtualHost()); + return new NonAsyncDeliverQueue(getVirtualHost()); } /** @@ -830,7 +829,8 @@ abstract class AbstractQueueTestBase extends QpidTestCase QueueNotificationListener listener = mock(QueueNotificationListener .class); _queue.setNotificationListener(listener); - _queue.setAlertThresholdQueueDepthMessages(2); + _queue.setAttributes(Collections.<String, Object>singletonMap(Queue.ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES, + Integer.valueOf(2))); _queue.enqueue(createMessage(new Long(24)), null); verifyZeroInteractions(listener); @@ -849,7 +849,8 @@ abstract class AbstractQueueTestBase extends QpidTestCase _queue.enqueue(createMessage(new Long(26)), null); _queue.setNotificationListener(listener); - _queue.setAlertThresholdQueueDepthMessages(2); + _queue.setAttributes(Collections.<String, Object>singletonMap(Queue.ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES, + Integer.valueOf(2))); verifyZeroInteractions(listener); @@ -1046,17 +1047,6 @@ abstract class AbstractQueueTestBase extends QpidTestCase return _consumerTarget; } - - static class TestSimpleQueueEntryListFactory implements QueueEntryListFactory - { - - @Override - public NonAsyncDeliverList createQueueEntryList(final AMQQueue<?> queue) - { - return new NonAsyncDeliverList((NonAsyncDeliverQueue) queue); - } - } - private static class NonAsyncDeliverEntry extends OrderedQueueEntry { @@ -1107,9 +1097,23 @@ abstract class AbstractQueueTestBase extends QpidTestCase private static class NonAsyncDeliverQueue extends AbstractQueue<NonAsyncDeliverQueue> { - public NonAsyncDeliverQueue(final TestSimpleQueueEntryListFactory factory, VirtualHostImpl vhost) + private QueueEntryList _entries = new NonAsyncDeliverList(this); + + public NonAsyncDeliverQueue(VirtualHostImpl vhost) + { + super(vhost, attributes()); + } + + @Override + protected void onOpen() + { + super.onOpen(); + } + + @Override + QueueEntryList getEntries() { - super(vhost, attributes(), factory); + return _entries; } private static Map<String,Object> attributes() diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/LastValueQueueListTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/LastValueQueueListTest.java index bc1d89a280..dfb6f0ee8c 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/LastValueQueueListTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/LastValueQueueListTest.java @@ -59,7 +59,8 @@ public class LastValueQueueListTest extends TestCase when(virtualHost.getSecurityManager()).thenReturn(mock(SecurityManager.class)); when(virtualHost.getEventLogger()).thenReturn(new EventLogger()); _queue = new LastValueQueueImpl(virtualHost, queueAttributes); - _list = (LastValueQueueList) _queue.getEntries(); + _queue.open(); + _list = _queue.getEntries(); } public void testListHasNoEntries() diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueListTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueListTest.java index ad677a98a7..80e8252cb3 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueListTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueListTest.java @@ -59,7 +59,8 @@ public class PriorityQueueListTest extends QpidTestCase when(virtualHost.getSecurityManager()).thenReturn(mock(SecurityManager.class)); when(virtualHost.getEventLogger()).thenReturn(new EventLogger()); PriorityQueueImpl queue = new PriorityQueueImpl(virtualHost, queueAttributes); - _list = (PriorityQueueList) queue.getEntries(); + queue.open(); + _list = queue.getEntries(); for (int i = 0; i < PRIORITIES.length; i++) { diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java index a5a25994ca..80a405d112 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java @@ -18,24 +18,24 @@ */ package org.apache.qpid.server.queue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.lang.reflect.Field; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; + import junit.framework.TestCase; import org.apache.qpid.server.logging.EventLogger; import org.apache.qpid.server.message.MessageInstance; +import org.apache.qpid.server.message.MessageInstance.EntryState; import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.message.ServerMessage; -import org.apache.qpid.server.message.MessageInstance.EntryState; import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.virtualhost.VirtualHostImpl; -import java.lang.reflect.Field; -import java.util.HashMap; -import java.util.Map; -import java.util.UUID; - -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - /** * Tests for {@link QueueEntryImpl} */ @@ -200,7 +200,8 @@ public abstract class QueueEntryImplTestBase extends TestCase when(virtualHost.getEventLogger()).thenReturn(new EventLogger()); StandardQueueImpl queue = new StandardQueueImpl(virtualHost, queueAttributes); - OrderedQueueEntryList queueEntryList = (OrderedQueueEntryList) queue.getEntries(); + queue.open(); + OrderedQueueEntryList queueEntryList = queue.getEntries(); // create test entries for(int i = 0; i < numberOfEntries ; i++) diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SelfValidatingSortedQueueEntryList.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SelfValidatingSortedQueueEntryList.java index dc4609734d..42462d75c4 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SelfValidatingSortedQueueEntryList.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SelfValidatingSortedQueueEntryList.java @@ -30,9 +30,9 @@ import org.apache.qpid.server.queue.SortedQueueEntry.Colour; */ public class SelfValidatingSortedQueueEntryList extends SortedQueueEntryList { - public SelfValidatingSortedQueueEntryList(SortedQueueImpl queue, String propertyName) + public SelfValidatingSortedQueueEntryList(SortedQueueImpl queue) { - super(queue, propertyName); + super(queue); } @Override diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java index 61e396ac27..26220cb8fa 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java @@ -20,18 +20,18 @@ */ package org.apache.qpid.server.queue; -import org.apache.qpid.server.logging.EventLogger; -import org.apache.qpid.server.message.MessageReference; -import org.apache.qpid.server.message.ServerMessage; -import org.apache.qpid.server.model.Queue; -import org.apache.qpid.server.virtualhost.VirtualHostImpl; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import java.util.HashMap; import java.util.Map; import java.util.UUID; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; +import org.apache.qpid.server.logging.EventLogger; +import org.apache.qpid.server.message.MessageReference; +import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.model.Queue; +import org.apache.qpid.server.virtualhost.VirtualHostImpl; public class SimpleQueueEntryImplTest extends QueueEntryImplTestBase { @@ -48,8 +48,8 @@ public class SimpleQueueEntryImplTest extends QueueEntryImplTestBase when(virtualHost.getSecurityManager()).thenReturn(mock(org.apache.qpid.server.security.SecurityManager.class)); when(virtualHost.getEventLogger()).thenReturn(new EventLogger()); StandardQueueImpl queue = new StandardQueueImpl(virtualHost, queueAttributes); - - queueEntryList = (OrderedQueueEntryList) queue.getEntries(); + queue.open(); + queueEntryList = queue.getEntries(); super.setUp(); } diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java index bcae391b92..d284324667 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java @@ -86,15 +86,23 @@ public class SortedQueueEntryListTest extends QueueEntryListTestBase final VirtualHostImpl virtualHost = mock(VirtualHostImpl.class); when(virtualHost.getSecurityManager()).thenReturn(mock(SecurityManager.class)); when(virtualHost.getEventLogger()).thenReturn(new EventLogger()); - _testQueue = new SortedQueueImpl(virtualHost, attributes, new QueueEntryListFactory() + _testQueue = new SortedQueueImpl(virtualHost, attributes) { + SelfValidatingSortedQueueEntryList _entries; + @Override + protected void onOpen() + { + super.onOpen(); + _entries = new SelfValidatingSortedQueueEntryList(this); + } @Override - public SortedQueueEntryList createQueueEntryList(final AMQQueue queue) + SelfValidatingSortedQueueEntryList getEntries() { - return new SelfValidatingSortedQueueEntryList((SortedQueueImpl) queue, "KEY"); + return _entries; } - }); + }; + _testQueue.open(); _sqel = (SelfValidatingSortedQueueEntryList) _testQueue.getEntries(); super.setUp(); @@ -124,7 +132,7 @@ public class SortedQueueEntryListTest extends QueueEntryListTestBase { if(newList) { - return new SelfValidatingSortedQueueEntryList(_testQueue, "KEY"); + return new SelfValidatingSortedQueueEntryList(_testQueue); } else { @@ -195,7 +203,7 @@ public class SortedQueueEntryListTest extends QueueEntryListTestBase public void testNonUniqueSortKeys() throws Exception { - _sqel = new SelfValidatingSortedQueueEntryList(_testQueue, "KEY"); + _sqel = new SelfValidatingSortedQueueEntryList(_testQueue); // Build test list long messageId = 0L; @@ -216,7 +224,7 @@ public class SortedQueueEntryListTest extends QueueEntryListTestBase public void testNullSortKeys() throws Exception { - _sqel = new SelfValidatingSortedQueueEntryList(_testQueue, "KEY"); + _sqel = new SelfValidatingSortedQueueEntryList(_testQueue); // Build test list long messageId = 0L; @@ -237,7 +245,7 @@ public class SortedQueueEntryListTest extends QueueEntryListTestBase public void testAscendingSortKeys() throws Exception { - _sqel = new SelfValidatingSortedQueueEntryList(_testQueue, "KEY"); + _sqel = new SelfValidatingSortedQueueEntryList(_testQueue); // Build test list long messageId = 0L; @@ -260,7 +268,7 @@ public class SortedQueueEntryListTest extends QueueEntryListTestBase public void testDescendingSortKeys() throws Exception { - _sqel = new SelfValidatingSortedQueueEntryList(_testQueue, "KEY"); + _sqel = new SelfValidatingSortedQueueEntryList(_testQueue); // Build test list long messageId = 0L; @@ -283,7 +291,7 @@ public class SortedQueueEntryListTest extends QueueEntryListTestBase public void testInsertAfter() throws Exception { - _sqel = new SelfValidatingSortedQueueEntryList(_testQueue, "KEY"); + _sqel = new SelfValidatingSortedQueueEntryList(_testQueue); ServerMessage msg = generateTestMessage(1, "A"); _sqel.add(msg); @@ -303,7 +311,7 @@ public class SortedQueueEntryListTest extends QueueEntryListTestBase public void testInsertBefore() throws Exception { - _sqel = new SelfValidatingSortedQueueEntryList(_testQueue, "KEY"); + _sqel = new SelfValidatingSortedQueueEntryList(_testQueue); ServerMessage msg = generateTestMessage(1, "B"); _sqel.add(msg); @@ -323,7 +331,7 @@ public class SortedQueueEntryListTest extends QueueEntryListTestBase public void testInsertInbetween() throws Exception { - _sqel = new SelfValidatingSortedQueueEntryList(_testQueue, "KEY"); + _sqel = new SelfValidatingSortedQueueEntryList(_testQueue); ServerMessage msg = generateTestMessage(1, "A"); _sqel.add(msg); @@ -354,7 +362,7 @@ public class SortedQueueEntryListTest extends QueueEntryListTestBase public void testInsertAtHead() throws Exception { - _sqel = new SelfValidatingSortedQueueEntryList(_testQueue, "KEY"); + _sqel = new SelfValidatingSortedQueueEntryList(_testQueue); ServerMessage msg = generateTestMessage(1, "B"); _sqel.add(msg); diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryTest.java index 90c4a82747..1a26d2e2ec 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryTest.java @@ -56,15 +56,23 @@ public class SortedQueueEntryTest extends QueueEntryImplTestBase final VirtualHostImpl virtualHost = mock(VirtualHostImpl.class); when(virtualHost.getSecurityManager()).thenReturn(mock(org.apache.qpid.server.security.SecurityManager.class)); when(virtualHost.getEventLogger()).thenReturn(new EventLogger()); - SortedQueueImpl queue = new SortedQueueImpl(virtualHost, attributes, new QueueEntryListFactory() + SortedQueueImpl queue = new SortedQueueImpl(virtualHost, attributes) { + SelfValidatingSortedQueueEntryList _entries; + @Override + protected void onOpen() + { + super.onOpen(); + _entries = new SelfValidatingSortedQueueEntryList(this); + } @Override - public SortedQueueEntryList createQueueEntryList(final AMQQueue queue) + SelfValidatingSortedQueueEntryList getEntries() { - return new SelfValidatingSortedQueueEntryList((SortedQueueImpl) queue, "KEY"); + return _entries; } - }); + }; + queue.open(); _queueEntryList = (SelfValidatingSortedQueueEntryList) queue.getEntries(); super.setUp(); } diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java index 2bab20a1b0..2a6e0d0863 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java @@ -20,20 +20,20 @@ */ package org.apache.qpid.server.queue; -import org.apache.qpid.server.logging.EventLogger; -import org.apache.qpid.server.message.MessageReference; -import org.apache.qpid.server.message.ServerMessage; -import org.apache.qpid.server.model.Queue; -import org.apache.qpid.server.security.SecurityManager; -import org.apache.qpid.server.virtualhost.VirtualHostImpl; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import java.util.HashMap; import java.util.Map; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; +import org.apache.qpid.server.logging.EventLogger; +import org.apache.qpid.server.message.MessageReference; +import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.model.Queue; +import org.apache.qpid.server.security.SecurityManager; +import org.apache.qpid.server.virtualhost.VirtualHostImpl; public class StandardQueueEntryListTest extends QueueEntryListTestBase { @@ -56,8 +56,8 @@ public class StandardQueueEntryListTest extends QueueEntryListTestBase when(virtualHost.getSecurityManager()).thenReturn(mock(SecurityManager.class)); when(virtualHost.getEventLogger()).thenReturn(new EventLogger()); _testQueue = new StandardQueueImpl(virtualHost, queueAttributes); - - _sqel = (StandardQueueEntryList) _testQueue.getEntries(); + _testQueue.open(); + _sqel = _testQueue.getEntries(); for(int i = 1; i <= 100; i++) { final ServerMessage message = mock(ServerMessage.class); @@ -102,8 +102,8 @@ public class StandardQueueEntryListTest extends QueueEntryListTestBase when(virtualHost.getSecurityManager()).thenReturn(mock(SecurityManager.class)); when(virtualHost.getEventLogger()).thenReturn(new EventLogger()); StandardQueueImpl queue = new StandardQueueImpl(virtualHost, queueAttributes); - - return (StandardQueueEntryList) queue.getEntries(); + queue.open(); + return queue.getEntries(); } else { diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java index f6d04175c5..db1537bcc5 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java @@ -249,9 +249,17 @@ public class StandardQueueTest extends AbstractQueueTestBase private static class DequeuedQueue extends AbstractQueue { + private QueueEntryList _entries = new DequeuedQueueEntryList(this); + public DequeuedQueue(VirtualHostImpl virtualHost) { - super(virtualHost, attributes(), new DequeuedQueueEntryListFactory()); + super(virtualHost, attributes()); + } + + @Override + QueueEntryList getEntries() + { + return _entries; } private static Map<String,Object> attributes() @@ -264,19 +272,6 @@ public class StandardQueueTest extends AbstractQueueTestBase return attributes; } } - private static class DequeuedQueueEntryListFactory implements QueueEntryListFactory - { - public DequeuedQueueEntryList createQueueEntryList(AMQQueue queue) - { - /** - * Override SimpleQueueEntryList to create a dequeued - * entries for messages with even id - */ - return new DequeuedQueueEntryList((DequeuedQueue) queue); - } - - - } private static class DequeuedQueueEntryList extends OrderedQueueEntryList { diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java index ea5e02e4d9..b793a9182e 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java @@ -53,6 +53,7 @@ import org.apache.qpid.server.plugin.ExchangeType; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler; +import org.apache.qpid.server.virtualhost.AbstractVirtualHost; import org.apache.qpid.server.virtualhost.VirtualHostImpl; import org.apache.qpid.test.utils.QpidTestCase; import org.apache.qpid.util.FileUtils; @@ -286,7 +287,7 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest public void testCreateQueueAMQQueueFieldTable() throws Exception { Map<String, Object> attributes = new HashMap<String, Object>(); - attributes.put(Queue.CREATE_DLQ_ON_CREATION, Boolean.TRUE); + attributes.put(AbstractVirtualHost.CREATE_DLQ_ON_CREATION, Boolean.TRUE); attributes.put(Queue.MAXIMUM_DELIVERY_ATTEMPTS, 10); AMQQueue queue = createTestQueue(getName(), getName() + "Owner", true, attributes); @@ -335,7 +336,7 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest { // create queue Map<String, Object> attributes = new HashMap<String, Object>(); - attributes.put(Queue.CREATE_DLQ_ON_CREATION, Boolean.TRUE); + attributes.put(AbstractVirtualHost.CREATE_DLQ_ON_CREATION, Boolean.TRUE); attributes.put(Queue.MAXIMUM_DELIVERY_ATTEMPTS, 10); AMQQueue queue = createTestQueue(getName(), getName() + "Owner", true, attributes); @@ -361,7 +362,7 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest { // create queue Map<String, Object> attributes = new HashMap<String, Object>(); - attributes.put(Queue.CREATE_DLQ_ON_CREATION, Boolean.TRUE); + attributes.put(AbstractVirtualHost.CREATE_DLQ_ON_CREATION, Boolean.TRUE); attributes.put(Queue.MAXIMUM_DELIVERY_ATTEMPTS, 10); AMQQueue queue = createTestQueue(getName(), getName() + "Owner", true, attributes); DurableConfigurationStoreHelper.createQueue(_configStore, queue); @@ -387,7 +388,7 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest { // create queue Map<String, Object> attributes = new HashMap<String, Object>(); - attributes.put(Queue.CREATE_DLQ_ON_CREATION, Boolean.TRUE); + attributes.put(AbstractVirtualHost.CREATE_DLQ_ON_CREATION, Boolean.TRUE); attributes.put(Queue.MAXIMUM_DELIVERY_ATTEMPTS, 10); AMQQueue queue = createTestQueue(getName(), getName() + "Owner", true, attributes); DurableConfigurationStoreHelper.createQueue(_configStore, queue); diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java index 95de8fd9e5..2822476b1c 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java @@ -67,7 +67,6 @@ import org.apache.qpid.server.store.DurableConfigurationStore; import org.apache.qpid.server.store.DurableConfiguredObjectRecoverer; import org.apache.qpid.server.store.StoreException; import org.apache.qpid.server.store.UnresolvedConfiguredObject; -import org.apache.qpid.server.util.MapValueConverter; import org.apache.qpid.test.utils.QpidTestCase; public class DurableConfigurationRecovererTest extends QpidTestCase @@ -84,6 +83,7 @@ public class DurableConfigurationRecovererTest extends QpidTestCase private DurableConfigurationStore _store; private ConfiguredObjectFactory _configuredObjectFactory; private ConfiguredObjectTypeFactory _exchangeFactory; + private ConfiguredObjectTypeFactory _queueFactory; @Override public void setUp() throws Exception @@ -91,6 +91,8 @@ public class DurableConfigurationRecovererTest extends QpidTestCase super.setUp(); _configuredObjectFactory = mock(ConfiguredObjectFactory.class); _exchangeFactory = mock(ConfiguredObjectTypeFactory.class); + _queueFactory = mock(ConfiguredObjectTypeFactory.class); + AMQQueue<?> queue = mock(AMQQueue.class); @@ -106,6 +108,8 @@ public class DurableConfigurationRecovererTest extends QpidTestCase when(_vhost.getQueue(eq(QUEUE_ID))).thenReturn(queue); when(_configuredObjectFactory.getConfiguredObjectTypeFactory(eq(Exchange.class), anyMap())).thenReturn(_exchangeFactory); + when(_configuredObjectFactory.getConfiguredObjectTypeFactory(eq(Queue.class), anyMap())).thenReturn(_queueFactory); + final ArgumentCaptor<ConfiguredObjectRecord> recoveredExchange = ArgumentCaptor.forClass(ConfiguredObjectRecord.class); doAnswer(new Answer() @@ -131,52 +135,38 @@ public class DurableConfigurationRecovererTest extends QpidTestCase - final ArgumentCaptor<Map> attributesArg = ArgumentCaptor.forClass(Map.class); - when(_vhost.restoreQueue(attributesArg.capture())).then( - new Answer() - { + final ArgumentCaptor<ConfiguredObjectRecord> recoveredQueue = ArgumentCaptor.forClass(ConfiguredObjectRecord.class); + doAnswer(new Answer() + { - @Override - public Object answer(final InvocationOnMock invocation) throws Throwable - { - final AMQQueue queue = mock(AMQQueue.class); - - final Map attributes = attributesArg.getValue(); - final String queueName = (String) attributes.get(Queue.NAME); - final UUID queueId = MapValueConverter.getUUIDAttribute(Queue.ID, attributes); - - when(queue.getName()).thenReturn(queueName); - when(queue.getId()).thenReturn(queueId); - when(_vhost.getQueue(eq(queueName))).thenReturn(queue); - when(_vhost.getQueue(eq(queueId))).thenReturn(queue); - - final ArgumentCaptor<ExchangeImpl> altExchangeArg = ArgumentCaptor.forClass(ExchangeImpl.class); - doAnswer( - new Answer() - { - @Override - public Object answer(InvocationOnMock invocation) throws Throwable - { - final ExchangeImpl value = altExchangeArg.getValue(); - when(queue.getAlternateExchange()).thenReturn(value); - return null; - } - } - ).when(queue).setAlternateExchange(altExchangeArg.capture()); - - Map args = attributes; - if (args.containsKey(Queue.ALTERNATE_EXCHANGE)) - { - final UUID exchangeId = UUID.fromString(args.get(Queue.ALTERNATE_EXCHANGE).toString()); - final ExchangeImpl exchange = - (ExchangeImpl) _vhost.getExchange(exchangeId); - queue.setAlternateExchange(exchange); - } - return queue; - } - }); + @Override + public Object answer(final InvocationOnMock invocation) throws Throwable + { + ConfiguredObjectRecord queueRecord = recoveredQueue.getValue(); + AMQQueue queue = mock(AMQQueue.class); + UUID id = queueRecord.getId(); + String name = (String) queueRecord.getAttributes().get("name"); + when(queue.getId()).thenReturn(id); + when(queue.getName()).thenReturn(name); + when(_vhost.getQueue(eq(id))).thenReturn(queue); + when(_vhost.getQueue(eq(name))).thenReturn(queue); + UnresolvedConfiguredObject unresolved = mock(UnresolvedConfiguredObject.class); + when(unresolved.resolve()).thenReturn(queue); + + Map args = queueRecord.getAttributes(); + if (args.containsKey(Queue.ALTERNATE_EXCHANGE)) + { + final UUID exchangeId = UUID.fromString(args.get(Queue.ALTERNATE_EXCHANGE).toString()); + final ExchangeImpl exchange = + _vhost.getExchange(exchangeId); + when(queue.getAlternateExchange()).thenReturn(exchange); + } + + return unresolved; + } + }).when(_queueFactory).recover(recoveredQueue.capture(), any(ConfiguredObject.class)); DurableConfiguredObjectRecoverer[] recoverers = { diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java index a58a251fbe..85eede527a 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java @@ -652,12 +652,6 @@ public class MockVirtualHost implements VirtualHostImpl<MockVirtualHost, AMQQueu } @Override - public AMQQueue restoreQueue(final Map<String, Object> attributes) - { - return null; - } - - @Override public boolean getDefaultDeadLetterQueueEnabled() { return false; diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostQueueCreationTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostQueueCreationTest.java index 5313b416c0..6a27946c29 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostQueueCreationTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostQueueCreationTest.java @@ -33,6 +33,7 @@ import org.apache.qpid.server.configuration.BrokerProperties; import org.apache.qpid.server.configuration.updater.TaskExecutor; import org.apache.qpid.server.exchange.ExchangeImpl; import org.apache.qpid.server.model.Broker; +import org.apache.qpid.server.model.Exchange; import org.apache.qpid.server.model.LifetimePolicy; import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.model.VirtualHost; @@ -153,21 +154,21 @@ public class VirtualHostQueueCreationTest extends QpidTestCase attributes.put(Queue.ID, UUID.randomUUID()); attributes.put(Queue.NAME, queueName); - attributes.put(Queue.CREATE_DLQ_ON_CREATION, true); + attributes.put(AbstractVirtualHost.CREATE_DLQ_ON_CREATION, true); AMQQueue queue = _virtualHost.createQueue(attributes); - ExchangeImpl altExchange = queue.getAlternateExchange(); + Exchange altExchange = queue.getAlternateExchange(); assertNotNull("Queue should have an alternate exchange as DLQ is enabled", altExchange); assertEquals("Alternate exchange name was not as expected", dlExchangeName, altExchange.getName()); - assertEquals("Alternate exchange type was not as expected", ExchangeDefaults.FANOUT_EXCHANGE_CLASS, altExchange.getTypeName()); + assertEquals("Alternate exchange type was not as expected", ExchangeDefaults.FANOUT_EXCHANGE_CLASS, altExchange.getType()); assertNotNull("The alternate exchange was not registered as expected", _virtualHost.getExchange(dlExchangeName)); assertEquals("The registered exchange was not the expected exchange instance", altExchange, _virtualHost.getExchange(dlExchangeName)); AMQQueue dlQueue = _virtualHost.getQueue(dlQueueName); assertNotNull("The DLQ was not registered as expected", dlQueue); - assertTrue("DLQ should have been bound to the alternate exchange", altExchange.isBound(dlQueue)); + assertTrue("DLQ should have been bound to the alternate exchange", ((ExchangeImpl)altExchange).isBound(dlQueue)); assertNull("DLQ should have no alternate exchange", dlQueue.getAlternateExchange()); assertEquals("DLQ should have a zero maximum delivery count", 0, dlQueue.getMaximumDeliveryAttempts()); @@ -192,23 +193,23 @@ public class VirtualHostQueueCreationTest extends QpidTestCase Map<String,Object> attributes = new HashMap<String, Object>(); attributes.put(Queue.ID, UUID.randomUUID()); attributes.put(Queue.NAME, queueName); - attributes.put(Queue.CREATE_DLQ_ON_CREATION, true); + attributes.put(AbstractVirtualHost.CREATE_DLQ_ON_CREATION, true); attributes.put(Queue.MAXIMUM_DELIVERY_ATTEMPTS, 5); AMQQueue queue = _virtualHost.createQueue(attributes); assertEquals("Unexpected maximum delivery count", 5, queue.getMaximumDeliveryAttempts()); - ExchangeImpl altExchange = queue.getAlternateExchange(); + Exchange altExchange = queue.getAlternateExchange(); assertNotNull("Queue should have an alternate exchange as DLQ is enabled", altExchange); assertEquals("Alternate exchange name was not as expected", dlExchangeName, altExchange.getName()); - assertEquals("Alternate exchange type was not as expected", ExchangeDefaults.FANOUT_EXCHANGE_CLASS, altExchange.getTypeName()); + assertEquals("Alternate exchange type was not as expected", ExchangeDefaults.FANOUT_EXCHANGE_CLASS, altExchange.getType()); assertNotNull("The alternate exchange was not registered as expected", _virtualHost.getExchange(dlExchangeName)); assertEquals("The registered exchange was not the expected exchange instance", altExchange, _virtualHost.getExchange(dlExchangeName)); AMQQueue dlQueue = _virtualHost.getQueue(dlQueueName); assertNotNull("The DLQ was not registered as expected", dlQueue); - assertTrue("DLQ should have been bound to the alternate exchange", altExchange.isBound(dlQueue)); + assertTrue("DLQ should have been bound to the alternate exchange", ((ExchangeImpl)altExchange).isBound(dlQueue)); assertNull("DLQ should have no alternate exchange", dlQueue.getAlternateExchange()); assertEquals("DLQ should have a zero maximum delivery count", 0, dlQueue.getMaximumDeliveryAttempts()); @@ -234,7 +235,7 @@ public class VirtualHostQueueCreationTest extends QpidTestCase attributes.put(Queue.ID, UUID.randomUUID()); attributes.put(Queue.NAME, queueName); - attributes.put(Queue.CREATE_DLQ_ON_CREATION, false); + attributes.put(AbstractVirtualHost.CREATE_DLQ_ON_CREATION, false); AMQQueue queue = _virtualHost.createQueue(attributes); @@ -266,7 +267,7 @@ public class VirtualHostQueueCreationTest extends QpidTestCase attributes.put(Queue.ID, UUID.randomUUID()); attributes.put(Queue.NAME, queueName); - attributes.put(Queue.CREATE_DLQ_ON_CREATION, true); + attributes.put(AbstractVirtualHost.CREATE_DLQ_ON_CREATION, true); attributes.put(Queue.LIFETIME_POLICY, LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS); //create an autodelete queue @@ -338,7 +339,7 @@ public class VirtualHostQueueCreationTest extends QpidTestCase catch (Exception e) { assertTrue(e instanceof IllegalArgumentException); - assertEquals("Value for attribute name is not found", e.getMessage()); + assertTrue(e.getMessage().startsWith("The name attribute is mandatory")); } } @@ -359,7 +360,7 @@ public class VirtualHostQueueCreationTest extends QpidTestCase attributes.put(Queue.ID, UUID.randomUUID()); attributes.put(Queue.NAME, queueName); - attributes.put(Queue.CREATE_DLQ_ON_CREATION, true); + attributes.put(AbstractVirtualHost.CREATE_DLQ_ON_CREATION, true); _virtualHost.createQueue(attributes); fail("queue with DLQ name having more than 255 characters can not be created!"); @@ -389,7 +390,7 @@ public class VirtualHostQueueCreationTest extends QpidTestCase attributes.put(Queue.ID, UUID.randomUUID()); attributes.put(Queue.NAME, queueName); - attributes.put(Queue.CREATE_DLQ_ON_CREATION, (Object) true); + attributes.put(AbstractVirtualHost.CREATE_DLQ_ON_CREATION, (Object) true); _virtualHost.createQueue(attributes); fail("queue with DLE name having more than 255 characters can not be created!"); diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java index a3fabf076c..d73d019000 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java @@ -20,28 +20,37 @@ */ package org.apache.qpid.server.protocol.v0_10; +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.qpid.server.consumer.AbstractConsumerTarget; import org.apache.qpid.server.consumer.ConsumerImpl; -import org.apache.qpid.server.exchange.ExchangeImpl; import org.apache.qpid.server.flow.FlowCreditManager; import org.apache.qpid.server.logging.EventLogger; import org.apache.qpid.server.logging.messages.ChannelMessages; import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.model.Exchange; import org.apache.qpid.server.plugin.MessageConverter; import org.apache.qpid.server.protocol.MessageConverterRegistry; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.store.TransactionLogResource; -import org.apache.qpid.server.consumer.AbstractConsumerTarget; import org.apache.qpid.server.txn.AutoCommitTransaction; import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.util.Action; import org.apache.qpid.server.util.ConnectionScopedRuntimeException; -import org.apache.qpid.transport.*; - -import java.util.Collections; -import java.util.Map; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; +import org.apache.qpid.transport.DeliveryProperties; +import org.apache.qpid.transport.Header; +import org.apache.qpid.transport.MessageAcceptMode; +import org.apache.qpid.transport.MessageAcquireMode; +import org.apache.qpid.transport.MessageCreditUnit; +import org.apache.qpid.transport.MessageFlowMode; +import org.apache.qpid.transport.MessageProperties; +import org.apache.qpid.transport.MessageTransfer; +import org.apache.qpid.transport.Method; +import org.apache.qpid.transport.Option; public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowCreditManager.FlowCreditManagerListener { @@ -405,7 +414,7 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC if(owningResource instanceof AMQQueue) { final AMQQueue queue = (AMQQueue)owningResource; - final ExchangeImpl alternateExchange = queue.getAlternateExchange(); + final Exchange alternateExchange = queue.getAlternateExchange(); if(alternateExchange != null) { diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java index 70094ea7c7..1f108ec3e9 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java @@ -63,7 +63,6 @@ import org.apache.qpid.server.TransactionTimeoutHelper.CloseAction; import org.apache.qpid.server.configuration.BrokerProperties; import org.apache.qpid.server.connection.SessionPrincipal; import org.apache.qpid.server.consumer.ConsumerImpl; -import org.apache.qpid.server.exchange.ExchangeImpl; import org.apache.qpid.server.filter.AMQInvalidArgumentException; import org.apache.qpid.server.filter.FilterManager; import org.apache.qpid.server.filter.FilterManagerFactory; @@ -86,6 +85,7 @@ import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.model.ConfigurationChangeListener; import org.apache.qpid.server.model.ConfiguredObject; import org.apache.qpid.server.model.Consumer; +import org.apache.qpid.server.model.Exchange; import org.apache.qpid.server.model.Session; import org.apache.qpid.server.model.State; import org.apache.qpid.server.protocol.AMQSessionModel; @@ -1616,7 +1616,7 @@ public class AMQChannel<T extends AMQProtocolSession<T>> { final AMQQueue queue = (AMQQueue) owningResource; - final ExchangeImpl altExchange = queue.getAlternateExchange(); + final Exchange altExchange = queue.getAlternateExchange(); if (altExchange == null) { diff --git a/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/QueueMBean.java b/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/QueueMBean.java index e95ed9a383..a9fc160618 100644 --- a/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/QueueMBean.java +++ b/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/QueueMBean.java @@ -24,6 +24,7 @@ package org.apache.qpid.server.jmx.mbeans; import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.concurrent.atomic.AtomicLong; import javax.management.JMException; @@ -294,14 +295,19 @@ public class QueueMBean extends AMQManagedObject implements ManagedQueue, QueueN { if (exchangeName == null || "".equals(exchangeName)) { - _queue.setAttribute(Queue.ALTERNATE_EXCHANGE, getAlternateExchange(), null); + _queue.setAttributes(Collections.singletonMap(Queue.ALTERNATE_EXCHANGE, null)); } else { - VirtualHost<?,?,?> virtualHost = _queue.getParent(VirtualHost.class); - Exchange exchange = MBeanUtils.findExchangeFromExchangeName(virtualHost, exchangeName); + try + { - _queue.setAttribute(Queue.ALTERNATE_EXCHANGE, getAlternateExchange(), exchange); + _queue.setAttributes(Collections.<String,Object>singletonMap(Queue.ALTERNATE_EXCHANGE, exchangeName)); + } + catch (IllegalArgumentException e) + { + throw new OperationsException("No such exchange \""+exchangeName+"\""); + } } } diff --git a/qpid/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/QueueMBeanTest.java b/qpid/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/QueueMBeanTest.java index a81de49fb9..6a49b7c4ed 100644 --- a/qpid/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/QueueMBeanTest.java +++ b/qpid/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/QueueMBeanTest.java @@ -19,8 +19,10 @@ package org.apache.qpid.server.jmx.mbeans; import static org.mockito.Matchers.argThat; +import static org.mockito.Matchers.eq; import static org.mockito.Matchers.isNull; import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -320,7 +322,7 @@ public class QueueMBeanTest extends QpidTestCase when(_mockQueue.getParent(VirtualHost.class)).thenReturn(mockVirtualHost); _queueMBean.setAlternateExchange("exchange2"); - verify(_mockQueue).setAttribute(Queue.ALTERNATE_EXCHANGE, null, mockExchange2); + verify(_mockQueue).setAttributes(Collections.<String,Object>singletonMap(Queue.ALTERNATE_EXCHANGE, "exchange2")); } public void testSetAlternateExchangeWithUnknownExchangeName() throws Exception @@ -331,7 +333,8 @@ public class QueueMBeanTest extends QpidTestCase VirtualHost mockVirtualHost = mock(VirtualHost.class); when(mockVirtualHost.getExchanges()).thenReturn(Collections.singletonList(mockExchange)); when(_mockQueue.getParent(VirtualHost.class)).thenReturn(mockVirtualHost); - + doThrow(new IllegalArgumentException()).when(_mockQueue).setAttributes( + eq(Collections.<String, Object>singletonMap(Queue.ALTERNATE_EXCHANGE, "notknown"))); try { _queueMBean.setAlternateExchange("notknown"); @@ -346,7 +349,7 @@ public class QueueMBeanTest extends QpidTestCase public void testRemoveAlternateExchange() throws Exception { _queueMBean.setAlternateExchange(""); - verify(_mockQueue).setAttribute(Queue.ALTERNATE_EXCHANGE, null, null); + verify(_mockQueue).setAttributes(Collections.singletonMap(Queue.ALTERNATE_EXCHANGE, null)); } /********** Operations **********/ diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/PersistentStoreTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/PersistentStoreTest.java index 555c4dd20d..c2ea420e4b 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/PersistentStoreTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/PersistentStoreTest.java @@ -21,8 +21,8 @@ package org.apache.qpid.server.store; -import org.apache.qpid.client.AMQSession; -import org.apache.qpid.test.utils.QpidBrokerTestCase; +import java.util.ArrayList; +import java.util.List; import javax.jms.Connection; import javax.jms.Destination; @@ -30,8 +30,9 @@ import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Session; -import java.util.ArrayList; -import java.util.List; + +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.test.utils.QpidBrokerTestCase; public class PersistentStoreTest extends QpidBrokerTestCase { diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java index 9949102af8..d37fb4d90f 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java @@ -562,7 +562,7 @@ public class VirtualHostMessageStoreTest extends QpidTestCase else if (lastValueQueue) { assertEquals("Queue is no longer a LastValue Queue", LastValueQueueImpl.class, queue.getClass()); - assertEquals("LastValue Queue Key has changed", LVQ_KEY, ((LastValueQueueImpl) queue).getConflationKey()); + assertEquals("LastValue Queue Key has changed", LVQ_KEY, ((LastValueQueueImpl) queue).getLvqKey()); } else { diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/systest/management/jmx/QueueManagementTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/systest/management/jmx/QueueManagementTest.java index 3bd91faa3e..a6a08d83f9 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/systest/management/jmx/QueueManagementTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/systest/management/jmx/QueueManagementTest.java @@ -18,20 +18,19 @@ */ package org.apache.qpid.systest.management.jmx; -import org.apache.commons.lang.time.FastDateFormat; - -import org.apache.log4j.Logger; -import org.apache.qpid.client.AMQSession; -import org.apache.qpid.configuration.ClientProperties; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.management.common.mbeans.ManagedBroker; -import org.apache.qpid.management.common.mbeans.ManagedQueue; -import org.apache.qpid.server.queue.NotificationCheckTest; -import org.apache.qpid.server.queue.QueueArgumentsConverter; -import org.apache.qpid.server.queue.StandardQueueImpl; -import org.apache.qpid.test.client.destination.AddressBasedDestinationTest; -import org.apache.qpid.test.utils.JMXTestUtils; -import org.apache.qpid.test.utils.QpidBrokerTestCase; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.SortedSet; +import java.util.TreeSet; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import javax.jms.Connection; import javax.jms.Destination; @@ -48,19 +47,20 @@ import javax.management.openmbean.CompositeData; import javax.management.openmbean.TabularData; import javax.naming.NamingException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.SortedSet; -import java.util.TreeSet; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; +import org.apache.commons.lang.time.FastDateFormat; +import org.apache.log4j.Logger; + +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.configuration.ClientProperties; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.management.common.mbeans.ManagedBroker; +import org.apache.qpid.management.common.mbeans.ManagedQueue; +import org.apache.qpid.server.queue.NotificationCheckTest; +import org.apache.qpid.server.queue.QueueArgumentsConverter; +import org.apache.qpid.server.queue.StandardQueueImpl; +import org.apache.qpid.test.client.destination.AddressBasedDestinationTest; +import org.apache.qpid.test.utils.JMXTestUtils; +import org.apache.qpid.test.utils.QpidBrokerTestCase; /** * Tests the JMX API for the Managed Queue. diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/GroupProviderRestTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/GroupProviderRestTest.java index 7a954c0185..9246877528 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/GroupProviderRestTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/GroupProviderRestTest.java @@ -35,7 +35,6 @@ import org.apache.qpid.server.model.Group; import org.apache.qpid.server.model.GroupProvider; import org.apache.qpid.server.model.LifetimePolicy; import org.apache.qpid.server.model.State; -import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.model.adapter.FileBasedGroupProvider; import org.apache.qpid.server.model.adapter.FileBasedGroupProviderImpl; import org.apache.qpid.test.utils.TestBrokerConfiguration; @@ -354,7 +353,6 @@ public class GroupProviderRestTest extends QpidRestTestCase assertNotNull("Attribute " + Group.NAME, groupName); assertNotNull("Attribute " + Group.ID, group.get(Group.ID)); - assertEquals("Attribute " + Group.ID, UUIDGenerator.generateGroupUUID(name, groupName).toString(), group.get(Group.ID)); } } diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/VirtualHostRestTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/VirtualHostRestTest.java index 4535425ea4..f336c7fc77 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/VirtualHostRestTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/VirtualHostRestTest.java @@ -38,10 +38,10 @@ import org.apache.qpid.server.model.Exchange; import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.queue.LastValueQueue; -import org.apache.qpid.server.queue.LastValueQueueImpl; import org.apache.qpid.server.queue.PriorityQueue; import org.apache.qpid.server.queue.SortedQueue; import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.virtualhost.AbstractVirtualHost; import org.apache.qpid.server.virtualhost.StandardVirtualHost; import org.apache.qpid.util.FileUtils; @@ -274,7 +274,7 @@ public class VirtualHostRestTest extends QpidRestTestCase Asserts.assertQueue(queueName , "lvq", lvqQueue); assertEquals("Unexpected value of queue attribute " + Queue.DURABLE, Boolean.TRUE, lvqQueue.get(Queue.DURABLE)); - assertEquals("Unexpected lvq key attribute", LastValueQueueImpl.DEFAULT_LVQ_KEY, lvqQueue.get(LastValueQueue.LVQ_KEY)); + assertEquals("Unexpected lvq key attribute", LastValueQueue.DEFAULT_LVQ_KEY, lvqQueue.get(LastValueQueue.LVQ_KEY)); } public void testPutCreateSortedQueueWithoutKey() throws Exception @@ -443,15 +443,15 @@ public class VirtualHostRestTest extends QpidRestTestCase String queueName = getTestQueueName(); Map<String, Object> attributes = new HashMap<String, Object>(); - attributes.put(Queue.CREATE_DLQ_ON_CREATION, true); + attributes.put(AbstractVirtualHost.CREATE_DLQ_ON_CREATION, true); //verify the starting state Map<String, Object> hostDetails = getRestTestHelper().getJsonAsSingletonList("/rest/virtualhost/test"); List<Map<String, Object>> queues = (List<Map<String, Object>>) hostDetails.get(VirtualHostRestTest.VIRTUALHOST_QUEUES_ATTRIBUTE); List<Map<String, Object>> exchanges = (List<Map<String, Object>>) hostDetails.get(VirtualHostRestTest.VIRTUALHOST_EXCHANGES_ATTRIBUTE); - assertNull("queue should not have already been present", getRestTestHelper().find(Queue.NAME, queueName , queues)); - assertNull("queue should not have already been present", getRestTestHelper().find(Queue.NAME, queueName + "_DLQ" , queues)); + assertNull("queue "+ queueName + " should not have already been present", getRestTestHelper().find(Queue.NAME, queueName , queues)); + assertNull("queue "+ queueName + "_DLQ should not have already been present", getRestTestHelper().find(Queue.NAME, queueName + "_DLQ" , queues)); assertNull("exchange should not have already been present", getRestTestHelper().find(Exchange.NAME, queueName + "_DLE" , exchanges)); //create the queue @@ -465,9 +465,9 @@ public class VirtualHostRestTest extends QpidRestTestCase Map<String, Object> queue = getRestTestHelper().find(Queue.NAME, queueName , queues); Map<String, Object> dlqQueue = getRestTestHelper().find(Queue.NAME, queueName + "_DLQ" , queues); Map<String, Object> dlExchange = getRestTestHelper().find(Exchange.NAME, queueName + "_DLE" , exchanges); - assertNotNull("queue should not have been present", queue); - assertNotNull("queue should not have been present", dlqQueue); - assertNotNull("exchange should not have been present", dlExchange); + assertNotNull("queue should have been present", queue); + assertNotNull("queue should have been present", dlqQueue); + assertNotNull("exchange should have been present", dlExchange); //verify that the alternate exchange is set as expected on the new queue Map<String, Object> queueAttributes = new HashMap<String, Object>(); |
