diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2014-04-17 12:38:47 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2014-04-17 12:38:47 +0000 |
| commit | 672f3f7ab44bb4666deb95e80008a9d3e7b35806 (patch) | |
| tree | 0889a31ee8cea09b84cb55e00d3ed02f2647b8d5 | |
| parent | 90a9dc3aae1bc0a755e032f472e1b37e6508b737 (diff) | |
| download | qpid-python-672f3f7ab44bb4666deb95e80008a9d3e7b35806.tar.gz | |
QPID-5580 : [Java Broker] Introduce explicit type hierarchy for queues in the ConfiguredObject model
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1588234 13f79535-47bb-0310-9956-ffa450edef68
42 files changed, 454 insertions, 346 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java index eff15c4905..fed8862782 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java @@ -24,8 +24,7 @@ import java.util.Collection; import org.apache.qpid.server.queue.QueueEntryVisitor; -@ManagedObject - +@ManagedObject( defaultType = "standard" ) public interface Queue<X extends Queue<X>> extends ConfiguredObject<X> { @@ -39,23 +38,16 @@ public interface Queue<X extends Queue<X>> extends ConfiguredObject<X> String MESSAGE_GROUP_KEY = "messageGroupKey"; String MESSAGE_GROUP_SHARED_GROUPS = "messageGroupSharedGroups"; String MESSAGE_GROUP_DEFAULT_GROUP = "messageGroupDefaultGroup"; - String LVQ_KEY = "lvqKey"; String MAXIMUM_DELIVERY_ATTEMPTS = "maximumDeliveryAttempts"; String NO_LOCAL = "noLocal"; String OWNER = "owner"; String QUEUE_FLOW_CONTROL_SIZE_BYTES = "queueFlowControlSizeBytes"; String QUEUE_FLOW_RESUME_SIZE_BYTES = "queueFlowResumeSizeBytes"; String QUEUE_FLOW_STOPPED = "queueFlowStopped"; - String SORT_KEY = "sortKey"; - String QUEUE_TYPE = "queueType"; - String PRIORITIES = "priorities"; String CREATE_DLQ_ON_CREATION = "x-qpid-dlq-enabled"; // TODO - this value should change @ManagedAttribute - String getQueueType(); - - @ManagedAttribute Exchange getAlternateExchange(); @ManagedAttribute( automate = true, defaultValue = "NONE" ) @@ -67,11 +59,6 @@ public interface Queue<X extends Queue<X>> extends ConfiguredObject<X> @ManagedAttribute boolean getNoLocal(); - @ManagedAttribute - String getLvqKey(); - - @ManagedAttribute - String getSortKey(); @ManagedAttribute String getMessageGroupKey(); @@ -135,8 +122,6 @@ public interface Queue<X extends Queue<X>> extends ConfiguredObject<X> @ManagedAttribute( automate = true, defaultValue = "${queue.alertRepeatGap}") long getAlertRepeatGap(); - @ManagedAttribute - int getPriorities(); //children Collection<? extends Binding> getBindings(); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java index 9a06569397..9a051be324 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java @@ -82,21 +82,21 @@ public class AMQQueueFactory implements QueueFactory AMQQueue queue; - if(attributes.containsKey(Queue.SORT_KEY)) + if(attributes.containsKey(SortedQueue.SORT_KEY)) { - queue = new SortedQueue(_virtualHost, attributes); + queue = new SortedQueueImpl(_virtualHost, attributes); } - else if(attributes.containsKey(Queue.LVQ_KEY)) + else if(attributes.containsKey(LastValueQueue.LVQ_KEY)) { - queue = new ConflationQueue(_virtualHost, attributes); + queue = new LastValueQueueImpl(_virtualHost, attributes); } - else if(attributes.containsKey(Queue.PRIORITIES)) + else if(attributes.containsKey(PriorityQueue.PRIORITIES)) { - queue = new PriorityQueue(_virtualHost, attributes); + queue = new PriorityQueueImpl(_virtualHost, attributes); } else { - queue = new StandardQueue(_virtualHost, attributes); + queue = new StandardQueueImpl(_virtualHost, attributes); } queue.open(); //Register the new queue diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java index 7738281034..c015bd6d7c 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java @@ -89,9 +89,9 @@ import org.apache.qpid.server.util.ServerScopedRuntimeException; import org.apache.qpid.server.util.StateChangeListener; import org.apache.qpid.server.virtualhost.VirtualHostImpl; -public abstract class AbstractQueue - extends AbstractConfiguredObject<AbstractQueue> - implements AMQQueue<AbstractQueue>, +public abstract class AbstractQueue<X extends AbstractQueue<X>> + extends AbstractConfiguredObject<X> + implements AMQQueue<X>, StateChangeListener<QueueConsumer<?>, State>, MessageGroupManager.ConsumerResetHelper { @@ -543,40 +543,10 @@ public abstract class AbstractQueue //We only return the boolean value if message groups are actually in use return _arguments.get(MESSAGE_GROUP_KEY) == null ? null : _arguments.get(MESSAGE_GROUP_SHARED_GROUPS); } - else if(LVQ_KEY.equals(name)) - { - if(this instanceof ConflationQueue) - { - return ((ConflationQueue)this).getConflationKey(); - } - } else if(QUEUE_FLOW_STOPPED.equals(name)) { return isOverfull(); } - else if(SORT_KEY.equals(name)) - { - if(this instanceof SortedQueue) - { - return ((SortedQueue)this).getSortedPropertyName(); - } - } - else if(QUEUE_TYPE.equals(name)) - { - if(this instanceof SortedQueue) - { - return "sorted"; - } - if(this instanceof ConflationQueue) - { - return "lvq"; - } - if(this instanceof PriorityQueue) - { - return "priority"; - } - return "standard"; - } else if(STATE.equals(name)) { return State.ACTIVE; // TODO @@ -585,13 +555,6 @@ public abstract class AbstractQueue { return getDescription(); } - else if(PRIORITIES.equals(name)) - { - if(this instanceof PriorityQueue) - { - return ((PriorityQueue)this).getPriorities(); - } - } return super.getAttribute(name); } @@ -2679,12 +2642,6 @@ public abstract class AbstractQueue } @Override - public String getQueueType() - { - return null; - } - - @Override public ExclusivityPolicy getExclusive() { return _exclusive; @@ -2697,18 +2654,6 @@ public abstract class AbstractQueue } @Override - public String getLvqKey() - { - return null; - } - - @Override - public String getSortKey() - { - return null; - } - - @Override public String getMessageGroupKey() { return (String) getAttribute(MESSAGE_GROUP_KEY); @@ -2728,13 +2673,6 @@ public abstract class AbstractQueue } @Override - public int getPriorities() - { - return 0; - } - - - @Override public State getState() { return isDeleted() ? State.DELETED : State.ACTIVE; diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueue.java new file mode 100644 index 0000000000..c97a789f65 --- /dev/null +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueue.java @@ -0,0 +1,33 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.queue; + +import org.apache.qpid.server.model.ManagedAttribute; +import org.apache.qpid.server.model.ManagedObject; + +@ManagedObject( category = false, type="lvq" ) +public interface LastValueQueue<X extends LastValueQueue<X>> extends AMQQueue<X> +{ + String LVQ_KEY = "lvqKey"; + + @ManagedAttribute + String getLvqKey(); +} diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/ConflationQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueueImpl.java index 9b5b7b23cb..9ee9125bcf 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/ConflationQueue.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueueImpl.java @@ -23,35 +23,52 @@ package org.apache.qpid.server.queue; import java.util.Map; -import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.util.MapValueConverter; import org.apache.qpid.server.virtualhost.VirtualHostImpl; -public class ConflationQueue extends AbstractQueue +public class LastValueQueueImpl extends AbstractQueue<LastValueQueueImpl> implements LastValueQueue<LastValueQueueImpl> { public static final String DEFAULT_LVQ_KEY = "qpid.LVQ_key"; - protected ConflationQueue(VirtualHostImpl virtualHost, - Map<String, Object> attributes) + protected LastValueQueueImpl(VirtualHostImpl virtualHost, + Map<String, Object> attributes) { super(virtualHost, attributes, entryList(attributes)); } - private static ConflationQueueList.Factory entryList(final Map<String, Object> attributes) + private static LastValueQueueList.Factory entryList(final Map<String, Object> attributes) { - String conflationKey = MapValueConverter.getStringAttribute(Queue.LVQ_KEY, + String conflationKey = MapValueConverter.getStringAttribute(LVQ_KEY, attributes, DEFAULT_LVQ_KEY); // conflation key can still be null if it was present in the map with a null value - return new ConflationQueueList.Factory(conflationKey == null ? DEFAULT_LVQ_KEY : conflationKey); + return new LastValueQueueList.Factory(conflationKey == null ? DEFAULT_LVQ_KEY : conflationKey); } public String getConflationKey() { - return ((ConflationQueueList)getEntries()).getConflationKey(); + return ((LastValueQueueList)getEntries()).getConflationKey(); } + @Override + public Object getAttribute(final String name) + { + if(LVQ_KEY.equals(name)) + { + if(this instanceof LastValueQueueImpl) + { + return getConflationKey(); + } + } + return super.getAttribute(name); + } + + @Override + public String getLvqKey() + { + return getConflationKey(); + } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueueList.java index c234b9cbb3..cd586e9629 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueueList.java @@ -21,20 +21,21 @@ package org.apache.qpid.server.queue; -import org.apache.qpid.server.message.ServerMessage; -import org.apache.qpid.server.txn.AutoCommitTransaction; -import org.apache.qpid.server.txn.ServerTransaction; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.util.Collections; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicReference; -public class ConflationQueueList extends OrderedQueueEntryList +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.txn.AutoCommitTransaction; +import org.apache.qpid.server.txn.ServerTransaction; + +public class LastValueQueueList extends OrderedQueueEntryList { - private static final Logger LOGGER = LoggerFactory.getLogger(ConflationQueueList.class); + private static final Logger LOGGER = LoggerFactory.getLogger(LastValueQueueList.class); private static final HeadCreator HEAD_CREATOR = new HeadCreator() { @@ -42,7 +43,7 @@ public class ConflationQueueList extends OrderedQueueEntryList @Override public ConflationQueueEntry createHead(final QueueEntryList list) { - return ((ConflationQueueList)list).createHead(); + return ((LastValueQueueList)list).createHead(); } }; @@ -53,7 +54,7 @@ public class ConflationQueueList extends OrderedQueueEntryList private final ConflationQueueEntry _deleteInProgress = new ConflationQueueEntry(this); private final ConflationQueueEntry _newerEntryAlreadyBeenAndGone = new ConflationQueueEntry(this); - public ConflationQueueList(ConflationQueue queue, String conflationKey) + public LastValueQueueList(LastValueQueueImpl queue, String conflationKey) { super(queue, HEAD_CREATOR); _conflationKey = conflationKey; @@ -199,12 +200,12 @@ public class ConflationQueueList extends OrderedQueueEntryList private AtomicReference<ConflationQueueEntry> _latestValueReference; - private ConflationQueueEntry(final ConflationQueueList queueEntryList) + private ConflationQueueEntry(final LastValueQueueList queueEntryList) { super(queueEntryList); } - public ConflationQueueEntry(ConflationQueueList queueEntryList, ServerMessage message) + public ConflationQueueEntry(LastValueQueueList queueEntryList, ServerMessage message) { super(queueEntryList, message); } @@ -264,9 +265,9 @@ public class ConflationQueueList extends OrderedQueueEntryList } @Override - public ConflationQueueList createQueueEntryList(final AMQQueue<?> queue) + public LastValueQueueList createQueueEntryList(final AMQQueue<?> queue) { - return new ConflationQueueList((ConflationQueue)queue, _conflationKey); + return new LastValueQueueList((LastValueQueueImpl)queue, _conflationKey); } } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java index 3ecf134b5f..0797bbd4e9 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java @@ -20,11 +20,11 @@ */ package org.apache.qpid.server.queue; -import org.apache.qpid.server.virtualhost.VirtualHostImpl; - import java.util.Map; -public abstract class OutOfOrderQueue extends AbstractQueue +import org.apache.qpid.server.virtualhost.VirtualHostImpl; + +public abstract class OutOfOrderQueue<X extends OutOfOrderQueue<X>> extends AbstractQueue<X> { protected OutOfOrderQueue(VirtualHostImpl virtualHost, diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueue.java index 04e94f24f6..8e8732d595 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueue.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueue.java @@ -1,52 +1,33 @@ /* -* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -* -*/ + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ package org.apache.qpid.server.queue; -import org.apache.qpid.server.model.Queue; -import org.apache.qpid.server.util.MapValueConverter; -import org.apache.qpid.server.virtualhost.VirtualHostImpl; +import org.apache.qpid.server.model.ManagedAttribute; +import org.apache.qpid.server.model.ManagedObject; -import java.util.Map; - -public class PriorityQueue extends OutOfOrderQueue +@ManagedObject( category = false, type="priority" ) +public interface PriorityQueue<X extends PriorityQueue<X>> extends AMQQueue<X> { + String PRIORITIES = "priorities"; - public static final int DEFAULT_PRIORITY_LEVELS = 10; - - protected PriorityQueue(VirtualHostImpl virtualHost, - Map<String, Object> attributes) - { - super(virtualHost, attributes, entryList(attributes)); - } - - private static PriorityQueueList.Factory entryList(final Map<String, Object> attributes) - { - final Integer priorities = MapValueConverter.getIntegerAttribute(Queue.PRIORITIES, attributes, - DEFAULT_PRIORITY_LEVELS); - - return new PriorityQueueList.Factory(priorities); - } - - public int getPriorities() - { - return getEntries().getPriorities(); - } + @ManagedAttribute + int getPriorities(); } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueImpl.java new file mode 100644 index 0000000000..4f73bea8e6 --- /dev/null +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueImpl.java @@ -0,0 +1,64 @@ +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ +package org.apache.qpid.server.queue; + +import java.util.Map; + +import org.apache.qpid.server.util.MapValueConverter; +import org.apache.qpid.server.virtualhost.VirtualHostImpl; + +public class PriorityQueueImpl extends OutOfOrderQueue<PriorityQueueImpl> implements PriorityQueue<PriorityQueueImpl> +{ + + public static final int DEFAULT_PRIORITY_LEVELS = 10; + + protected PriorityQueueImpl(VirtualHostImpl virtualHost, + Map<String, Object> attributes) + { + super(virtualHost, attributes, entryList(attributes)); + } + + private static PriorityQueueList.Factory entryList(final Map<String, Object> attributes) + { + final Integer priorities = MapValueConverter.getIntegerAttribute(PRIORITIES, attributes, + DEFAULT_PRIORITY_LEVELS); + + return new PriorityQueueList.Factory(priorities); + } + + @Override + public int getPriorities() + { + return getEntries().getPriorities(); + } + + @Override + public Object getAttribute(final String name) + { + + if(PRIORITIES.equals(name)) + { + return getPriorities(); + } + + return super.getAttribute(name); + } +} diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java index d47b7ce2cd..f72c8cd57a 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java @@ -26,7 +26,7 @@ abstract public class PriorityQueueList extends OrderedQueueEntryList { - public PriorityQueueList(final PriorityQueue queue, + public PriorityQueueList(final PriorityQueueImpl queue, final HeadCreator headCreator) { super(queue, headCreator); @@ -43,12 +43,12 @@ abstract public class PriorityQueueList extends OrderedQueueEntryList return null; } }; - private final PriorityQueue _queue; + private final PriorityQueueImpl _queue; private final PriorityQueueEntrySubList[] _priorityLists; private final int _priorities; private final int _priorityOffset; - public PriorityQueueMasterList(PriorityQueue queue, int priorities) + public PriorityQueueMasterList(PriorityQueueImpl queue, int priorities) { super(queue, DUMMY_HEAD_CREATOR); _queue = queue; @@ -67,7 +67,7 @@ abstract public class PriorityQueueList extends OrderedQueueEntryList } @Override - public PriorityQueue getQueue() + public PriorityQueueImpl getQueue() { return _queue; } @@ -196,7 +196,7 @@ abstract public class PriorityQueueList extends OrderedQueueEntryList public PriorityQueueList createQueueEntryList(AMQQueue<?> queue) { - return new PriorityQueueMasterList((PriorityQueue) queue, _priorities); + return new PriorityQueueMasterList((PriorityQueueImpl) queue, _priorities); } } @@ -212,7 +212,7 @@ abstract public class PriorityQueueList extends OrderedQueueEntryList }; private int _listPriority; - public PriorityQueueEntrySubList(PriorityQueue queue, int listPriority) + public PriorityQueueEntrySubList(PriorityQueueImpl queue, int listPriority) { super(queue, HEAD_CREATOR); _listPriority = listPriority; diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java index 757db35af9..c1687de86e 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java @@ -78,9 +78,9 @@ public class QueueArgumentsConverter ATTRIBUTE_MAPPINGS.put(X_QPID_CAPACITY, Queue.QUEUE_FLOW_CONTROL_SIZE_BYTES); ATTRIBUTE_MAPPINGS.put(X_QPID_FLOW_RESUME_CAPACITY, Queue.QUEUE_FLOW_RESUME_SIZE_BYTES); - ATTRIBUTE_MAPPINGS.put(QPID_QUEUE_SORT_KEY, Queue.SORT_KEY); - ATTRIBUTE_MAPPINGS.put(QPID_LAST_VALUE_QUEUE_KEY, Queue.LVQ_KEY); - ATTRIBUTE_MAPPINGS.put(X_QPID_PRIORITIES, Queue.PRIORITIES); + ATTRIBUTE_MAPPINGS.put(QPID_QUEUE_SORT_KEY, SortedQueue.SORT_KEY); + ATTRIBUTE_MAPPINGS.put(QPID_LAST_VALUE_QUEUE_KEY, LastValueQueue.LVQ_KEY); + ATTRIBUTE_MAPPINGS.put(X_QPID_PRIORITIES, PriorityQueue.PRIORITIES); ATTRIBUTE_MAPPINGS.put(X_QPID_DESCRIPTION, Queue.DESCRIPTION); @@ -108,7 +108,7 @@ public class QueueArgumentsConverter } if(wireArguments.containsKey(QPID_LAST_VALUE_QUEUE) && !wireArguments.containsKey(QPID_LAST_VALUE_QUEUE_KEY)) { - modelArguments.put(Queue.LVQ_KEY, ConflationQueue.DEFAULT_LVQ_KEY); + modelArguments.put(LastValueQueue.LVQ_KEY, LastValueQueueImpl.DEFAULT_LVQ_KEY); } if(wireArguments.containsKey(QPID_SHARED_MSG_GROUP)) { diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java index 5f7ea14889..19f9f6c427 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java @@ -1,4 +1,5 @@ /* + * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -19,54 +20,16 @@ */ package org.apache.qpid.server.queue; -import org.apache.qpid.server.message.MessageInstance; -import org.apache.qpid.server.message.ServerMessage; -import org.apache.qpid.server.model.Queue; -import org.apache.qpid.server.util.Action; -import org.apache.qpid.server.util.MapValueConverter; -import org.apache.qpid.server.virtualhost.VirtualHostImpl; - -import java.util.Map; +import org.apache.qpid.server.model.ManagedAttribute; +import org.apache.qpid.server.model.ManagedObject; -public class SortedQueue extends OutOfOrderQueue +@ManagedObject( category = false, type="sorted" ) +public interface SortedQueue<X extends SortedQueue<X>> extends AMQQueue<X> { - //Lock object to synchronize enqueue. Used instead of the object - //monitor to prevent lock order issues with consumer sendLocks - //and consumer updates in the super classes - private final Object _sortedQueueLock = new Object(); - private final String _sortedPropertyName; - - protected SortedQueue(VirtualHostImpl virtualHost, - Map<String, Object> attributes, - QueueEntryListFactory factory) - { - super(virtualHost, attributes, factory); - _sortedPropertyName = MapValueConverter.getStringAttribute(Queue.SORT_KEY,attributes); - } - - - protected SortedQueue(VirtualHostImpl virtualHost, - Map<String, Object> attributes) - { - this(virtualHost, - attributes, - new SortedQueueEntryListFactory(MapValueConverter.getStringAttribute(Queue.SORT_KEY, attributes))); - } - + String SORT_KEY = "sortKey"; + @ManagedAttribute + String getSortKey(); - public String getSortedPropertyName() - { - return _sortedPropertyName; - } - @Override - public void enqueue(final ServerMessage message, - final Action<? super MessageInstance> action) - { - synchronized (_sortedQueueLock) - { - super.enqueue(message, action); - } - } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java index 990ca76d67..92abe30442 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java @@ -36,17 +36,17 @@ public class SortedQueueEntryList implements QueueEntryList private SortedQueueEntry _root; private long _entryId = Long.MIN_VALUE; private final Object _lock = new Object(); - private final SortedQueue _queue; + private final SortedQueueImpl _queue; private final String _propertyName; - public SortedQueueEntryList(final SortedQueue queue, final String propertyName) + public SortedQueueEntryList(final SortedQueueImpl queue, final String propertyName) { _queue = queue; _head = new SortedQueueEntry(this); _propertyName = propertyName; } - public SortedQueue getQueue() + public SortedQueueImpl getQueue() { return _queue; } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryListFactory.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryListFactory.java index ed80b33234..5cab126374 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryListFactory.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryListFactory.java @@ -32,6 +32,6 @@ public class SortedQueueEntryListFactory implements QueueEntryListFactory @Override public SortedQueueEntryList createQueueEntryList(final AMQQueue<?> queue) { - return new SortedQueueEntryList((SortedQueue) queue, _propertyName); + return new SortedQueueEntryList((SortedQueueImpl) queue, _propertyName); } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueImpl.java new file mode 100644 index 0000000000..3115f6f581 --- /dev/null +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueImpl.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.queue; + +import java.util.Map; + +import org.apache.qpid.server.message.MessageInstance; +import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.util.Action; +import org.apache.qpid.server.util.MapValueConverter; +import org.apache.qpid.server.virtualhost.VirtualHostImpl; + +public class SortedQueueImpl extends OutOfOrderQueue<SortedQueueImpl> implements SortedQueue<SortedQueueImpl> +{ + //Lock object to synchronize enqueue. Used instead of the object + //monitor to prevent lock order issues with consumer sendLocks + //and consumer updates in the super classes + private final Object _sortedQueueLock = new Object(); + private final String _sortedPropertyName; + + protected SortedQueueImpl(VirtualHostImpl virtualHost, + Map<String, Object> attributes, + QueueEntryListFactory factory) + { + super(virtualHost, attributes, factory); + _sortedPropertyName = MapValueConverter.getStringAttribute(SORT_KEY,attributes); + } + + + protected SortedQueueImpl(VirtualHostImpl virtualHost, + Map<String, Object> attributes) + { + this(virtualHost, + attributes, + new SortedQueueEntryListFactory(MapValueConverter.getStringAttribute(SORT_KEY, attributes))); + } + + + + public String getSortedPropertyName() + { + return _sortedPropertyName; + } + + @Override + public void enqueue(final ServerMessage message, + final Action<? super MessageInstance> action) + { + synchronized (_sortedQueueLock) + { + super.enqueue(message, action); + } + } + + @Override + public Object getAttribute(final String name) + { + + if(SORT_KEY.equals(name)) + { + return getSortedPropertyName(); + } + + return super.getAttribute(name); + } + + @Override + public String getSortKey() + { + return getSortedPropertyName(); + } +} diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueue.java index 44578375e0..8d040064a1 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueue.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueue.java @@ -20,15 +20,9 @@ */ package org.apache.qpid.server.queue; -import java.util.Map; +import org.apache.qpid.server.model.ManagedObject; -import org.apache.qpid.server.virtualhost.VirtualHostImpl; - -public class StandardQueue extends AbstractQueue +@ManagedObject( category = false, type="standard" ) +public interface StandardQueue<X extends StandardQueue<X>> extends AMQQueue<X> { - public StandardQueue(final VirtualHostImpl virtualHost, - final Map<String, Object> arguments) - { - super(virtualHost, arguments, new StandardQueueEntryList.Factory()); - } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueueEntryList.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueueEntryList.java index 0b8b7f7c6b..341e2365c3 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueueEntryList.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueueEntryList.java @@ -34,7 +34,7 @@ public class StandardQueueEntryList extends OrderedQueueEntryList } }; - public StandardQueueEntryList(final StandardQueue queue) + public StandardQueueEntryList(final StandardQueueImpl queue) { super(queue, HEAD_CREATOR); } @@ -50,7 +50,7 @@ public class StandardQueueEntryList extends OrderedQueueEntryList public StandardQueueEntryList createQueueEntryList(AMQQueue<?> queue) { - return new StandardQueueEntryList((StandardQueue) queue); + return new StandardQueueEntryList((StandardQueueImpl) queue); } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueueImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueueImpl.java new file mode 100644 index 0000000000..4dc57b14a7 --- /dev/null +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueueImpl.java @@ -0,0 +1,34 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.queue; + +import java.util.Map; + +import org.apache.qpid.server.virtualhost.VirtualHostImpl; + +public class StandardQueueImpl extends AbstractQueue<StandardQueueImpl> implements StandardQueue<StandardQueueImpl> +{ + public StandardQueueImpl(final VirtualHostImpl virtualHost, + final Map<String, Object> arguments) + { + super(virtualHost, arguments, new StandardQueueEntryList.Factory()); + } +} diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java index c49f626242..b4a54aa5cb 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java @@ -70,9 +70,12 @@ import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.protocol.LinkRegistry; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.AMQQueueFactory; -import org.apache.qpid.server.queue.ConflationQueue; +import org.apache.qpid.server.queue.LastValueQueue; +import org.apache.qpid.server.queue.LastValueQueueImpl; import org.apache.qpid.server.queue.DefaultQueueRegistry; +import org.apache.qpid.server.queue.PriorityQueue; import org.apache.qpid.server.queue.QueueRegistry; +import org.apache.qpid.server.queue.SortedQueue; import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.security.access.Operation; import org.apache.qpid.server.stats.StatisticsCounter; @@ -627,9 +630,9 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte // make a copy as we may augment (with an ID for example) attributes = new LinkedHashMap<String, Object>(attributes); - if (attributes.containsKey(Queue.QUEUE_TYPE)) + if (attributes.containsKey(Queue.TYPE)) { - String typeAttribute = MapValueConverter.getStringAttribute(Queue.QUEUE_TYPE, attributes, null); + String typeAttribute = MapValueConverter.getStringAttribute(Queue.TYPE, attributes, null); QueueType queueType = null; try { @@ -639,15 +642,15 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte { throw new IllegalArgumentException("Unsupported queue type :" + typeAttribute); } - if (queueType == QueueType.LVQ && attributes.get(Queue.LVQ_KEY) == null) + if (queueType == QueueType.LVQ && attributes.get(LastValueQueue.LVQ_KEY) == null) { - attributes.put(Queue.LVQ_KEY, ConflationQueue.DEFAULT_LVQ_KEY); + attributes.put(LastValueQueue.LVQ_KEY, LastValueQueueImpl.DEFAULT_LVQ_KEY); } - else if (queueType == QueueType.PRIORITY && attributes.get(Queue.PRIORITIES) == null) + else if (queueType == QueueType.PRIORITY && attributes.get(PriorityQueue.PRIORITIES) == null) { - attributes.put(Queue.PRIORITIES, 10); + attributes.put(PriorityQueue.PRIORITIES, 10); } - else if (queueType == QueueType.SORTED && attributes.get(Queue.SORT_KEY) == null) + else if (queueType == QueueType.SORTED && attributes.get(SortedQueue.SORT_KEY) == null) { throw new IllegalArgumentException("Sort key is not specified for sorted queue"); } diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java index ed2de21b6b..8ec78e952e 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java @@ -195,12 +195,12 @@ public class AMQQueueFactoryTest extends QpidTestCase attributes.put(Queue.ID, UUID.randomUUID()); attributes.put(Queue.NAME, "testPriorityQueue"); - attributes.put(Queue.PRIORITIES, 5); + attributes.put(PriorityQueue.PRIORITIES, 5); AMQQueue queue = _queueFactory.createQueue(attributes); - assertEquals("Queue not a priority queue", PriorityQueue.class, queue.getClass()); + assertEquals("Queue not a priority queue", PriorityQueueImpl.class, queue.getClass()); verifyQueueRegistered("testPriorityQueue"); verifyRegisteredQueueCount(1); } @@ -217,7 +217,7 @@ public class AMQQueueFactoryTest extends QpidTestCase AMQQueue queue = _queueFactory.createQueue(attributes); - assertEquals("Queue not a simple queue", StandardQueue.class, queue.getClass()); + assertEquals("Queue not a simple queue", StandardQueueImpl.class, queue.getClass()); verifyQueueRegistered(queueName); //verify that no alternate exchange or DLQ were produced diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java index eb0ab8633e..c62b541191 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java @@ -1105,7 +1105,7 @@ abstract class AbstractQueueTestBase extends QpidTestCase } - private static class NonAsyncDeliverQueue extends AbstractQueue + private static class NonAsyncDeliverQueue extends AbstractQueue<NonAsyncDeliverQueue> { public NonAsyncDeliverQueue(final TestSimpleQueueEntryListFactory factory, VirtualHostImpl vhost) { diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/ConflationQueueListTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/LastValueQueueListTest.java index 5c049b7d3f..bc1d89a280 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/ConflationQueueListTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/LastValueQueueListTest.java @@ -21,21 +21,22 @@ package org.apache.qpid.server.queue; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; + +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; + import junit.framework.TestCase; import org.apache.qpid.server.logging.EventLogger; -import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.message.AMQMessageHeader; +import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.virtualhost.VirtualHostImpl; -import java.util.HashMap; -import java.util.Map; -import java.util.UUID; - -public class ConflationQueueListTest extends TestCase +public class LastValueQueueListTest extends TestCase { private static final String CONFLATION_KEY = "CONFLATION_KEY"; @@ -43,8 +44,8 @@ public class ConflationQueueListTest extends TestCase private static final String TEST_KEY_VALUE1 = "testKeyValue1"; private static final String TEST_KEY_VALUE2 = "testKeyValue2"; - private ConflationQueueList _list; - private ConflationQueue _queue; + private LastValueQueueList _list; + private LastValueQueueImpl _queue; @Override protected void setUp() throws Exception @@ -53,12 +54,12 @@ public class ConflationQueueListTest extends TestCase Map<String,Object> queueAttributes = new HashMap<String, Object>(); queueAttributes.put(Queue.ID, UUID.randomUUID()); queueAttributes.put(Queue.NAME, getName()); - queueAttributes.put(Queue.LVQ_KEY, CONFLATION_KEY); + queueAttributes.put(LastValueQueue.LVQ_KEY, CONFLATION_KEY); final VirtualHostImpl virtualHost = mock(VirtualHostImpl.class); when(virtualHost.getSecurityManager()).thenReturn(mock(SecurityManager.class)); when(virtualHost.getEventLogger()).thenReturn(new EventLogger()); - _queue = new ConflationQueue(virtualHost, queueAttributes); - _list = (ConflationQueueList) _queue.getEntries(); + _queue = new LastValueQueueImpl(virtualHost, queueAttributes); + _list = (LastValueQueueList) _queue.getEntries(); } public void testListHasNoEntries() @@ -188,7 +189,7 @@ public class ConflationQueueListTest extends TestCase assertEquals(0, _list.getLatestValuesMap().size()); } - private int countEntries(ConflationQueueList list) + private int countEntries(LastValueQueueList list) { QueueEntryIterator iterator = list.iterator(); diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueListTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueListTest.java index 50cf4f44aa..ad677a98a7 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueListTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueListTest.java @@ -23,6 +23,10 @@ package org.apache.qpid.server.queue; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; + import org.apache.qpid.server.logging.EventLogger; import org.apache.qpid.server.message.AMQMessageHeader; import org.apache.qpid.server.message.MessageReference; @@ -33,10 +37,6 @@ import org.apache.qpid.server.util.BrokerTestHelper; import org.apache.qpid.server.virtualhost.VirtualHostImpl; import org.apache.qpid.test.utils.QpidTestCase; -import java.util.HashMap; -import java.util.Map; -import java.util.UUID; - public class PriorityQueueListTest extends QpidTestCase { private static final byte[] PRIORITIES = {4, 5, 5, 4}; @@ -54,11 +54,11 @@ public class PriorityQueueListTest extends QpidTestCase Map<String,Object> queueAttributes = new HashMap<String, Object>(); queueAttributes.put(Queue.ID, UUID.randomUUID()); queueAttributes.put(Queue.NAME, getName()); - queueAttributes.put(Queue.PRIORITIES, 10); + queueAttributes.put(PriorityQueue.PRIORITIES, 10); final VirtualHostImpl virtualHost = mock(VirtualHostImpl.class); when(virtualHost.getSecurityManager()).thenReturn(mock(SecurityManager.class)); when(virtualHost.getEventLogger()).thenReturn(new EventLogger()); - PriorityQueue queue = new PriorityQueue(virtualHost, queueAttributes); + PriorityQueueImpl queue = new PriorityQueueImpl(virtualHost, queueAttributes); _list = (PriorityQueueList) queue.getEntries(); for (int i = 0; i < PRIORITIES.length; i++) diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueTest.java index 09cec61909..fdc0411f95 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueTest.java @@ -20,7 +20,12 @@ */ package org.apache.qpid.server.queue; +import static org.mockito.Mockito.when; + +import java.util.ArrayList; import java.util.Collections; +import java.util.EnumSet; + import junit.framework.AssertionFailedError; import org.apache.qpid.server.consumer.ConsumerImpl; @@ -28,20 +33,13 @@ import org.apache.qpid.server.message.AMQMessageHeader; import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.ServerMessage; -import java.util.ArrayList; -import java.util.EnumSet; - -import org.apache.qpid.server.model.Queue; - -import static org.mockito.Mockito.when; - public class PriorityQueueTest extends AbstractQueueTestBase { @Override public void setUp() throws Exception { - setArguments(Collections.singletonMap(Queue.PRIORITIES,(Object)3)); + setArguments(Collections.singletonMap(PriorityQueue.PRIORITIES,(Object)3)); super.setUp(); } diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java index ef2a8ebccf..a5a25994ca 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java @@ -199,7 +199,7 @@ public abstract class QueueEntryImplTestBase extends TestCase when(virtualHost.getSecurityManager()).thenReturn(mock(org.apache.qpid.server.security.SecurityManager.class)); when(virtualHost.getEventLogger()).thenReturn(new EventLogger()); - StandardQueue queue = new StandardQueue(virtualHost, queueAttributes); + StandardQueueImpl queue = new StandardQueueImpl(virtualHost, queueAttributes); OrderedQueueEntryList queueEntryList = (OrderedQueueEntryList) queue.getEntries(); // create test entries diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SelfValidatingSortedQueueEntryList.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SelfValidatingSortedQueueEntryList.java index 0ea4ee930d..dc4609734d 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SelfValidatingSortedQueueEntryList.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SelfValidatingSortedQueueEntryList.java @@ -30,13 +30,13 @@ import org.apache.qpid.server.queue.SortedQueueEntry.Colour; */ public class SelfValidatingSortedQueueEntryList extends SortedQueueEntryList { - public SelfValidatingSortedQueueEntryList(SortedQueue queue, String propertyName) + public SelfValidatingSortedQueueEntryList(SortedQueueImpl queue, String propertyName) { super(queue, propertyName); } @Override - public SortedQueue getQueue() + public SortedQueueImpl getQueue() { return super.getQueue(); } diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java index cb2e9bf062..61e396ac27 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java @@ -47,7 +47,7 @@ public class SimpleQueueEntryImplTest extends QueueEntryImplTestBase final VirtualHostImpl virtualHost = mock(VirtualHostImpl.class); when(virtualHost.getSecurityManager()).thenReturn(mock(org.apache.qpid.server.security.SecurityManager.class)); when(virtualHost.getEventLogger()).thenReturn(new EventLogger()); - StandardQueue queue = new StandardQueue(virtualHost, queueAttributes); + StandardQueueImpl queue = new StandardQueueImpl(virtualHost, queueAttributes); queueEntryList = (OrderedQueueEntryList) queue.getEntries(); diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java index 0334b9d93b..bcae391b92 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java @@ -19,7 +19,15 @@ */ package org.apache.qpid.server.queue; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; import org.apache.qpid.server.logging.EventLogger; import org.apache.qpid.server.message.AMQMessageHeader; @@ -30,15 +38,6 @@ import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.virtualhost.VirtualHostImpl; -import java.util.Arrays; -import java.util.HashMap; -import java.util.Map; -import java.util.UUID; - -import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - public class SortedQueueEntryListTest extends QueueEntryListTestBase { private static SelfValidatingSortedQueueEntryList _sqel; @@ -71,7 +70,7 @@ public class SortedQueueEntryListTest extends QueueEntryListTestBase private final static String keysSorted[] = keys.clone(); - private SortedQueue _testQueue; + private SortedQueueImpl _testQueue; @Override protected void setUp() throws Exception @@ -81,19 +80,19 @@ public class SortedQueueEntryListTest extends QueueEntryListTestBase attributes.put(Queue.NAME, getName()); attributes.put(Queue.DURABLE, false); attributes.put(Queue.LIFETIME_POLICY, LifetimePolicy.PERMANENT); - attributes.put(Queue.SORT_KEY, "KEY"); + attributes.put(SortedQueue.SORT_KEY, "KEY"); // Create test list final VirtualHostImpl virtualHost = mock(VirtualHostImpl.class); when(virtualHost.getSecurityManager()).thenReturn(mock(SecurityManager.class)); when(virtualHost.getEventLogger()).thenReturn(new EventLogger()); - _testQueue = new SortedQueue(virtualHost, attributes, new QueueEntryListFactory() + _testQueue = new SortedQueueImpl(virtualHost, attributes, new QueueEntryListFactory() { @Override public SortedQueueEntryList createQueueEntryList(final AMQQueue queue) { - return new SelfValidatingSortedQueueEntryList((SortedQueue) queue, "KEY"); + return new SelfValidatingSortedQueueEntryList((SortedQueueImpl) queue, "KEY"); } }); _sqel = (SelfValidatingSortedQueueEntryList) _testQueue.getEntries(); @@ -149,7 +148,7 @@ public class SortedQueueEntryListTest extends QueueEntryListTestBase } @Override - protected SortedQueue getTestQueue() + protected SortedQueueImpl getTestQueue() { return _testQueue; } diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryTest.java index 2a96f08c53..90c4a82747 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryTest.java @@ -19,6 +19,10 @@ */ package org.apache.qpid.server.queue; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -32,10 +36,6 @@ import org.apache.qpid.server.model.LifetimePolicy; import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.virtualhost.VirtualHostImpl; -import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - public class SortedQueueEntryTest extends QueueEntryImplTestBase { @@ -51,18 +51,18 @@ public class SortedQueueEntryTest extends QueueEntryImplTestBase attributes.put(Queue.NAME, getName()); attributes.put(Queue.DURABLE, false); attributes.put(Queue.LIFETIME_POLICY, LifetimePolicy.PERMANENT); - attributes.put(Queue.SORT_KEY, "KEY"); + attributes.put(SortedQueue.SORT_KEY, "KEY"); final VirtualHostImpl virtualHost = mock(VirtualHostImpl.class); when(virtualHost.getSecurityManager()).thenReturn(mock(org.apache.qpid.server.security.SecurityManager.class)); when(virtualHost.getEventLogger()).thenReturn(new EventLogger()); - SortedQueue queue = new SortedQueue(virtualHost, attributes, new QueueEntryListFactory() + SortedQueueImpl queue = new SortedQueueImpl(virtualHost, attributes, new QueueEntryListFactory() { @Override public SortedQueueEntryList createQueueEntryList(final AMQQueue queue) { - return new SelfValidatingSortedQueueEntryList((SortedQueue) queue, "KEY"); + return new SelfValidatingSortedQueueEntryList((SortedQueueImpl) queue, "KEY"); } }); _queueEntryList = (SelfValidatingSortedQueueEntryList) queue.getEntries(); diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java index 27cbe7dc8d..2bab20a1b0 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java @@ -38,7 +38,7 @@ import static org.mockito.Mockito.when; public class StandardQueueEntryListTest extends QueueEntryListTestBase { - private StandardQueue _testQueue; + private StandardQueueImpl _testQueue; private StandardQueueEntryList _sqel; private static final String SCAVENGE_PROP = "qpid.queue.scavenge_count"; @@ -55,7 +55,7 @@ public class StandardQueueEntryListTest extends QueueEntryListTestBase final VirtualHostImpl virtualHost = mock(VirtualHostImpl.class); when(virtualHost.getSecurityManager()).thenReturn(mock(SecurityManager.class)); when(virtualHost.getEventLogger()).thenReturn(new EventLogger()); - _testQueue = new StandardQueue(virtualHost, queueAttributes); + _testQueue = new StandardQueueImpl(virtualHost, queueAttributes); _sqel = (StandardQueueEntryList) _testQueue.getEntries(); for(int i = 1; i <= 100; i++) @@ -101,7 +101,7 @@ public class StandardQueueEntryListTest extends QueueEntryListTestBase final VirtualHostImpl virtualHost = mock(VirtualHostImpl.class); when(virtualHost.getSecurityManager()).thenReturn(mock(SecurityManager.class)); when(virtualHost.getEventLogger()).thenReturn(new EventLogger()); - StandardQueue queue = new StandardQueue(virtualHost, queueAttributes); + StandardQueueImpl queue = new StandardQueueImpl(virtualHost, queueAttributes); return (StandardQueueEntryList) queue.getEntries(); } @@ -132,7 +132,7 @@ public class StandardQueueEntryListTest extends QueueEntryListTestBase } @Override - protected StandardQueue getTestQueue() + protected StandardQueueImpl getTestQueue() { return _testQueue; } diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java index 37519e7a0b..f6d04175c5 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java @@ -49,7 +49,7 @@ public class StandardQueueTest extends AbstractQueueTestBase queueAttributes.put(Queue.ID, UUID.randomUUID()); queueAttributes.put(Queue.NAME, getQname()); queueAttributes.put(Queue.LIFETIME_POLICY, LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS); - final StandardQueue queue = new StandardQueue(getVirtualHost(), queueAttributes); + final StandardQueueImpl queue = new StandardQueueImpl(getVirtualHost(), queueAttributes); queue.open(); setQueue(queue); @@ -72,7 +72,7 @@ public class StandardQueueTest extends AbstractQueueTestBase queueAttributes.put(Queue.ID, UUID.randomUUID()); queueAttributes.put(Queue.NAME, "testActiveConsumerCount"); queueAttributes.put(Queue.OWNER, "testOwner"); - final StandardQueue queue = new StandardQueue(getVirtualHost(), queueAttributes); + final StandardQueueImpl queue = new StandardQueueImpl(getVirtualHost(), queueAttributes); queue.open(); //verify adding an active consumer increases the count final MockConsumer consumer1 = new MockConsumer(); @@ -180,7 +180,7 @@ public class StandardQueueTest extends AbstractQueueTestBase queueAttributes.put(Queue.ID, UUID.randomUUID()); queueAttributes.put(Queue.NAME, "test"); // create queue with overridden method deliverAsync - StandardQueue testQueue = new StandardQueue(getVirtualHost(), queueAttributes) + StandardQueueImpl testQueue = new StandardQueueImpl(getVirtualHost(), queueAttributes) { @Override public void deliverAsync(QueueConsumer sub) diff --git a/qpid/java/broker-plugins/management-http/src/main/java/resources/addQueue.html b/qpid/java/broker-plugins/management-http/src/main/java/resources/addQueue.html index 410a5d23ca..9a24e23407 100644 --- a/qpid/java/broker-plugins/management-http/src/main/java/resources/addQueue.html +++ b/qpid/java/broker-plugins/management-http/src/main/java/resources/addQueue.html @@ -35,16 +35,16 @@ <tr> <td valign="top"><strong>Queue Type: </strong></td> <td> - <input type="radio" id="formAddQueueTypeStandard" name="queueType" value="standard" checked="checked" dojoType="dijit.form.RadioButton" /> + <input type="radio" id="formAddQueueTypeStandard" name="type" value="standard" checked="checked" dojoType="dijit.form.RadioButton" /> <label for="formAddQueueTypeStandard">Standard</label> - <input type="radio" id="formAddQueueTypePriority" name="queueType" value="priority" dojoType="dijit.form.RadioButton" /> + <input type="radio" id="formAddQueueTypePriority" name="type" value="priority" dojoType="dijit.form.RadioButton" /> <label for="formAddQueueTypePriority">Priority</label> - <input type="radio" id="formAddQueueTypeLVQ" name="queueType" value="lvq" dojoType="dijit.form.RadioButton" /> + <input type="radio" id="formAddQueueTypeLVQ" name="type" value="lvq" dojoType="dijit.form.RadioButton" /> <label for="formAddQueueTypeLVQ">LVQ</label> - <input type="radio" id="formAddQueueTypeSorted" name="queueType" value="sorted" dojoType="dijit.form.RadioButton" /> + <input type="radio" id="formAddQueueTypeSorted" name="type" value="sorted" dojoType="dijit.form.RadioButton" /> <label for="formAddQueueTypeSorted">Sorted</label> </td> </tr> diff --git a/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Queue.js b/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Queue.js index 606f95e349..c1442e5edc 100644 --- a/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Queue.js +++ b/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Queue.js @@ -277,7 +277,7 @@ define(["dojo/_base/xhr", "exclusive", "owner", "lifetimePolicy", - "queueType", + "type", "typeQualifier", "alertRepeatGap", "alertRepeatGapUnits", @@ -359,14 +359,14 @@ define(["dojo/_base/xhr", bytesDepth = formatter.formatBytes( this.queueData["unacknowledgedBytes"] ); this.unacknowledgedBytes.innerHTML = "(" + bytesDepth.value; this.unacknowledgedBytesUnits.innerHTML = bytesDepth.units + ")"; - this.queueType.innerHTML = entities.encode(this.queueData[ "queueType" ]); - if (this.queueData.queueType == "standard") + this["type" ].innerHTML = entities.encode(this.queueData[ "type" ]); + if (this.queueData["type"] == "standard") { this.typeQualifier.style.display = "none"; } else { - this.typeQualifier.innerHTML = entities.encode("(" + queueTypeKeyNames[this.queueData.queueType] + ": " + this.queueData[queueTypeKeys[this.queueData.queueType]] + ")"); + this.typeQualifier.innerHTML = entities.encode("(" + queueTypeKeyNames[this.queueData[ "type" ]] + ": " + this.queueData[queueTypeKeys[this.queueData[ "type" ]]] + ")"); } if(this.queueData["messageGroupKey"]) diff --git a/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/addQueue.js b/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/addQueue.js index 64e821b2dd..64b31309d9 100644 --- a/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/addQueue.js +++ b/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/addQueue.js @@ -96,7 +96,7 @@ define(["dojo/_base/xhr", } } else if (!typeSpecificFields.hasOwnProperty(propName) || - formValues.queueType === typeSpecificFields[ propName ]) { + formValues[ "type" ] === typeSpecificFields[ propName ]) { if(formValues[ propName ] !== "") { if (fieldConverters.hasOwnProperty(propName)) { @@ -130,7 +130,7 @@ define(["dojo/_base/xhr", theForm = registry.byId("formAddQueue"); array.forEach(theForm.getDescendants(), function(widget) { - if(widget.name === "queueType") { + if(widget.name === "type") { widget.on("change", function(isChecked) { var objId = widget.id + ":fields"; @@ -195,4 +195,4 @@ define(["dojo/_base/xhr", }; return addQueue; - });
\ No newline at end of file + }); diff --git a/qpid/java/broker-plugins/management-http/src/main/java/resources/showQueue.html b/qpid/java/broker-plugins/management-http/src/main/java/resources/showQueue.html index 252deb3100..89b7327957 100644 --- a/qpid/java/broker-plugins/management-http/src/main/java/resources/showQueue.html +++ b/qpid/java/broker-plugins/management-http/src/main/java/resources/showQueue.html @@ -47,7 +47,7 @@ <div style="clear:both"> <div class="formLabel-labelCell" style="float:left; width: 150px;">Type:</div> <div style="float:left;"> - <span class="queueType"></span> + <span class="type"></span> <span class="typeQualifier"></span> </div> </div> diff --git a/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/QueueMBean.java b/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/QueueMBean.java index 74183eafc5..e95ed9a383 100644 --- a/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/QueueMBean.java +++ b/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/QueueMBean.java @@ -185,7 +185,7 @@ public class QueueMBean extends AMQManagedObject implements ManagedQueue, QueueN @Override public String getQueueType() { - return (String) _queue.getAttribute(Queue.QUEUE_TYPE); + return (String) _queue.getAttribute(Queue.TYPE); } public boolean isDurable() diff --git a/qpid/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/QueueMBeanTest.java b/qpid/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/QueueMBeanTest.java index 2df196eff6..a81de49fb9 100644 --- a/qpid/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/QueueMBeanTest.java +++ b/qpid/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/QueueMBeanTest.java @@ -131,7 +131,7 @@ public class QueueMBeanTest extends QpidTestCase public void testQueueType() throws Exception { - when(_mockQueue.getAttribute(Queue.QUEUE_TYPE)).thenReturn(QUEUE_TYPE); + when(_mockQueue.getAttribute(Queue.TYPE)).thenReturn(QUEUE_TYPE); MBeanTestUtils.assertMBeanAttribute(_queueMBean, "queueType", QUEUE_TYPE); } diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ConflationQueueTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/LastValueQueueTest.java index 0e59e9cceb..dc30c02951 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ConflationQueueTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/LastValueQueueTest.java @@ -47,9 +47,9 @@ import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -public class ConflationQueueTest extends QpidBrokerTestCase +public class LastValueQueueTest extends QpidBrokerTestCase { - private static final Logger LOGGER = Logger.getLogger(ConflationQueueTest.class); + private static final Logger LOGGER = Logger.getLogger(LastValueQueueTest.class); private static final String MESSAGE_SEQUENCE_NUMBER_PROPERTY = "msg"; private static final String KEY_PROPERTY = "key"; diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java index d857bb4ce0..9949102af8 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java @@ -50,9 +50,11 @@ import org.apache.qpid.server.plugin.ExchangeType; import org.apache.qpid.server.protocol.v0_8.AMQMessage; import org.apache.qpid.server.protocol.v0_8.MessageMetaData; import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.queue.ConflationQueue; +import org.apache.qpid.server.queue.LastValueQueue; +import org.apache.qpid.server.queue.LastValueQueueImpl; import org.apache.qpid.server.queue.PriorityQueue; -import org.apache.qpid.server.queue.StandardQueue; +import org.apache.qpid.server.queue.PriorityQueueImpl; +import org.apache.qpid.server.queue.StandardQueueImpl; import org.apache.qpid.server.txn.AutoCommitTransaction; import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.util.BrokerTestHelper; @@ -553,18 +555,18 @@ public class VirtualHostMessageStoreTest extends QpidTestCase if (usePriority) { - assertEquals("Queue is no longer a Priority Queue", PriorityQueue.class, queue.getClass()); + assertEquals("Queue is no longer a Priority Queue", PriorityQueueImpl.class, queue.getClass()); assertEquals("Priority Queue does not have set priorities", - DEFAULT_PRIORTY_LEVEL, ((PriorityQueue) queue).getPriorities()); + DEFAULT_PRIORTY_LEVEL, ((PriorityQueueImpl) queue).getPriorities()); } else if (lastValueQueue) { - assertEquals("Queue is no longer a LastValue Queue", ConflationQueue.class, queue.getClass()); - assertEquals("LastValue Queue Key has changed", LVQ_KEY, ((ConflationQueue) queue).getConflationKey()); + assertEquals("Queue is no longer a LastValue Queue", LastValueQueueImpl.class, queue.getClass()); + assertEquals("LastValue Queue Key has changed", LVQ_KEY, ((LastValueQueueImpl) queue).getConflationKey()); } else { - assertEquals("Queue is not 'simple'", StandardQueue.class, queue.getClass()); + assertEquals("Queue is not 'simple'", StandardQueueImpl.class, queue.getClass()); } assertEquals("Queue owner is not as expected for queue " + queue.getName(), exclusive ? queueOwner : null, queue.getOwner()); @@ -660,12 +662,12 @@ public class VirtualHostMessageStoreTest extends QpidTestCase if (usePriority) { - queueArguments.put(Queue.PRIORITIES, DEFAULT_PRIORTY_LEVEL); + queueArguments.put(PriorityQueue.PRIORITIES, DEFAULT_PRIORTY_LEVEL); } if (lastValueQueue) { - queueArguments.put(Queue.LVQ_KEY, LVQ_KEY); + queueArguments.put(LastValueQueue.LVQ_KEY, LVQ_KEY); } queueArguments.put(Queue.ID, UUIDGenerator.generateRandomUUID()); diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/systest/management/jmx/QueueManagementTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/systest/management/jmx/QueueManagementTest.java index 931974942f..3bd91faa3e 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/systest/management/jmx/QueueManagementTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/systest/management/jmx/QueueManagementTest.java @@ -28,7 +28,7 @@ import org.apache.qpid.management.common.mbeans.ManagedBroker; import org.apache.qpid.management.common.mbeans.ManagedQueue; import org.apache.qpid.server.queue.NotificationCheckTest; import org.apache.qpid.server.queue.QueueArgumentsConverter; -import org.apache.qpid.server.queue.StandardQueue; +import org.apache.qpid.server.queue.StandardQueueImpl; import org.apache.qpid.test.client.destination.AddressBasedDestinationTest; import org.apache.qpid.test.utils.JMXTestUtils; import org.apache.qpid.test.utils.QpidBrokerTestCase; @@ -660,7 +660,7 @@ public class QueueManagementTest extends QpidBrokerTestCase final Object messageGroupKey = "test"; final Map<String, Object> arguments = new HashMap<String, Object>(2); arguments.put(QueueArgumentsConverter.QPID_GROUP_HEADER_KEY, messageGroupKey); - arguments.put(QueueArgumentsConverter.QPID_SHARED_MSG_GROUP, StandardQueue.SHARED_MSG_GROUP_ARG_VALUE); + arguments.put(QueueArgumentsConverter.QPID_SHARED_MSG_GROUP, StandardQueueImpl.SHARED_MSG_GROUP_ARG_VALUE); managedBroker.createNewQueue(queueName, null, true, arguments); final ManagedQueue managedQueue = _jmxUtils.getManagedQueue(queueName); diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/Asserts.java b/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/Asserts.java index a2bb05dd00..47a25dbbec 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/Asserts.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/Asserts.java @@ -43,6 +43,9 @@ import org.apache.qpid.server.model.Port; import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.model.State; import org.apache.qpid.server.model.VirtualHost; +import org.apache.qpid.server.queue.LastValueQueue; +import org.apache.qpid.server.queue.PriorityQueue; +import org.apache.qpid.server.queue.SortedQueue; import org.apache.qpid.test.utils.TestBrokerConfiguration; public class Asserts @@ -113,11 +116,11 @@ public class Asserts Queue.ALTERNATE_EXCHANGE, Queue.OWNER, Queue.NO_LOCAL, - Queue.LVQ_KEY, - Queue.SORT_KEY, + LastValueQueue.LVQ_KEY, + SortedQueue.SORT_KEY, Queue.MESSAGE_GROUP_KEY, Queue.MESSAGE_GROUP_SHARED_GROUPS, - Queue.PRIORITIES, + PriorityQueue.PRIORITIES, ConfiguredObject.CONTEXT); assertEquals("Unexpected value of queue attribute " + Queue.NAME, queueName, queueData.get(Queue.NAME)); @@ -127,9 +130,9 @@ public class Asserts queueData.get(Queue.STATE)); assertEquals("Unexpected value of queue attribute " + Queue.LIFETIME_POLICY, LifetimePolicy.PERMANENT.name(), queueData.get(Queue.LIFETIME_POLICY)); - assertEquals("Unexpected value of queue attribute " + Queue.QUEUE_TYPE, + assertEquals("Unexpected value of queue attribute " + Queue.TYPE, queueType, - queueData.get(Queue.QUEUE_TYPE)); + queueData.get(Queue.TYPE)); if (expectedAttributes == null) { assertEquals("Unexpected value of queue attribute " + Queue.EXCLUSIVE, diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/VirtualHostRestTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/VirtualHostRestTest.java index 421c609e46..4535425ea4 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/VirtualHostRestTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/VirtualHostRestTest.java @@ -37,7 +37,10 @@ import org.apache.qpid.client.AMQConnection; import org.apache.qpid.server.model.Exchange; import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.model.VirtualHost; -import org.apache.qpid.server.queue.ConflationQueue; +import org.apache.qpid.server.queue.LastValueQueue; +import org.apache.qpid.server.queue.LastValueQueueImpl; +import org.apache.qpid.server.queue.PriorityQueue; +import org.apache.qpid.server.queue.SortedQueue; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.virtualhost.StandardVirtualHost; import org.apache.qpid.util.FileUtils; @@ -193,15 +196,15 @@ public class VirtualHostRestTest extends QpidRestTestCase createQueue(queueName + "-standard", "standard", null); Map<String, Object> sortedQueueAttributes = new HashMap<String, Object>(); - sortedQueueAttributes.put(Queue.SORT_KEY, "sortme"); + sortedQueueAttributes.put(SortedQueue.SORT_KEY, "sortme"); createQueue(queueName + "-sorted", "sorted", sortedQueueAttributes); Map<String, Object> priorityQueueAttributes = new HashMap<String, Object>(); - priorityQueueAttributes.put(Queue.PRIORITIES, 10); + priorityQueueAttributes.put(PriorityQueue.PRIORITIES, 10); createQueue(queueName + "-priority", "priority", priorityQueueAttributes); Map<String, Object> lvqQueueAttributes = new HashMap<String, Object>(); - lvqQueueAttributes.put(Queue.LVQ_KEY, "LVQ"); + lvqQueueAttributes.put(LastValueQueue.LVQ_KEY, "LVQ"); createQueue(queueName + "-lvq", "lvq", lvqQueueAttributes); Map<String, Object> hostDetails = getRestTestHelper().getJsonAsSingletonList("/rest/virtualhost/test"); @@ -223,9 +226,9 @@ public class VirtualHostRestTest extends QpidRestTestCase assertEquals("Unexpected value of queue attribute " + Queue.DURABLE, Boolean.TRUE, priorityQueue.get(Queue.DURABLE)); assertEquals("Unexpected value of queue attribute " + Queue.DURABLE, Boolean.TRUE, lvqQueue.get(Queue.DURABLE)); - assertEquals("Unexpected sorted key attribute", "sortme", sortedQueue.get(Queue.SORT_KEY)); - assertEquals("Unexpected lvq key attribute", "LVQ", lvqQueue.get(Queue.LVQ_KEY)); - assertEquals("Unexpected priorities key attribute", 10, priorityQueue.get(Queue.PRIORITIES)); + assertEquals("Unexpected sorted key attribute", "sortme", sortedQueue.get(SortedQueue.SORT_KEY)); + assertEquals("Unexpected lvq key attribute", "LVQ", lvqQueue.get(LastValueQueue.LVQ_KEY)); + assertEquals("Unexpected priorities key attribute", 10, priorityQueue.get(PriorityQueue.PRIORITIES)); } public void testPutCreateExchange() throws Exception @@ -271,7 +274,7 @@ public class VirtualHostRestTest extends QpidRestTestCase Asserts.assertQueue(queueName , "lvq", lvqQueue); assertEquals("Unexpected value of queue attribute " + Queue.DURABLE, Boolean.TRUE, lvqQueue.get(Queue.DURABLE)); - assertEquals("Unexpected lvq key attribute", ConflationQueue.DEFAULT_LVQ_KEY, lvqQueue.get(Queue.LVQ_KEY)); + assertEquals("Unexpected lvq key attribute", LastValueQueueImpl.DEFAULT_LVQ_KEY, lvqQueue.get(LastValueQueue.LVQ_KEY)); } public void testPutCreateSortedQueueWithoutKey() throws Exception @@ -302,7 +305,7 @@ public class VirtualHostRestTest extends QpidRestTestCase Asserts.assertQueue(queueName , "priority", priorityQueue); assertEquals("Unexpected value of queue attribute " + Queue.DURABLE, Boolean.TRUE, priorityQueue.get(Queue.DURABLE)); - assertEquals("Unexpected number of priorities", 10, priorityQueue.get(Queue.PRIORITIES)); + assertEquals("Unexpected number of priorities", 10, priorityQueue.get(PriorityQueue.PRIORITIES)); } public void testPutCreateStandardQueueWithoutType() throws Exception @@ -401,17 +404,17 @@ public class VirtualHostRestTest extends QpidRestTestCase Map<String, Object> sortedQueueAttributes = new HashMap<String, Object>(); sortedQueueAttributes.putAll(attributes); - sortedQueueAttributes.put(Queue.SORT_KEY, "sortme"); + sortedQueueAttributes.put(SortedQueue.SORT_KEY, "sortme"); createQueue(queueName + "-sorted", "sorted", sortedQueueAttributes); Map<String, Object> priorityQueueAttributes = new HashMap<String, Object>(); priorityQueueAttributes.putAll(attributes); - priorityQueueAttributes.put(Queue.PRIORITIES, 10); + priorityQueueAttributes.put(PriorityQueue.PRIORITIES, 10); createQueue(queueName + "-priority", "priority", priorityQueueAttributes); Map<String, Object> lvqQueueAttributes = new HashMap<String, Object>(); lvqQueueAttributes.putAll(attributes); - lvqQueueAttributes.put(Queue.LVQ_KEY, "LVQ"); + lvqQueueAttributes.put(LastValueQueue.LVQ_KEY, "LVQ"); createQueue(queueName + "-lvq", "lvq", lvqQueueAttributes); Map<String, Object> hostDetails = getRestTestHelper().getJsonAsSingletonList("/rest/virtualhost/test"); @@ -429,9 +432,9 @@ public class VirtualHostRestTest extends QpidRestTestCase Asserts.assertQueue(queueName + "-priority", "priority", priorityQueue, attributes); Asserts.assertQueue(queueName + "-lvq", "lvq", lvqQueue, attributes); - assertEquals("Unexpected sorted key attribute", "sortme", sortedQueue.get(Queue.SORT_KEY)); - assertEquals("Unexpected lvq key attribute", "LVQ", lvqQueue.get(Queue.LVQ_KEY)); - assertEquals("Unexpected priorities key attribute", 10, priorityQueue.get(Queue.PRIORITIES)); + assertEquals("Unexpected sorted key attribute", "sortme", sortedQueue.get(SortedQueue.SORT_KEY)); + assertEquals("Unexpected lvq key attribute", "LVQ", lvqQueue.get(LastValueQueue.LVQ_KEY)); + assertEquals("Unexpected priorities key attribute", 10, priorityQueue.get(PriorityQueue.PRIORITIES)); } @SuppressWarnings("unchecked") @@ -506,7 +509,7 @@ public class VirtualHostRestTest extends QpidRestTestCase queueData.put(Queue.DURABLE, Boolean.TRUE); if (queueType != null) { - queueData.put(Queue.QUEUE_TYPE, queueType); + queueData.put(Queue.TYPE, queueType); } if (attributes != null) { |
