From 672f3f7ab44bb4666deb95e80008a9d3e7b35806 Mon Sep 17 00:00:00 2001 From: Robert Godfrey Date: Thu, 17 Apr 2014 12:38:47 +0000 Subject: QPID-5580 : [Java Broker] Introduce explicit type hierarchy for queues in the ConfiguredObject model git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1588234 13f79535-47bb-0310-9956-ffa450edef68 --- .../java/org/apache/qpid/server/model/Queue.java | 17 +- .../apache/qpid/server/queue/AMQQueueFactory.java | 14 +- .../apache/qpid/server/queue/AbstractQueue.java | 68 +-- .../apache/qpid/server/queue/ConflationQueue.java | 57 -- .../qpid/server/queue/ConflationQueueList.java | 272 ---------- .../apache/qpid/server/queue/LastValueQueue.java | 33 ++ .../qpid/server/queue/LastValueQueueImpl.java | 74 +++ .../qpid/server/queue/LastValueQueueList.java | 273 ++++++++++ .../apache/qpid/server/queue/OutOfOrderQueue.java | 6 +- .../apache/qpid/server/queue/PriorityQueue.java | 71 +-- .../qpid/server/queue/PriorityQueueImpl.java | 64 +++ .../qpid/server/queue/PriorityQueueList.java | 12 +- .../qpid/server/queue/QueueArgumentsConverter.java | 8 +- .../org/apache/qpid/server/queue/SortedQueue.java | 53 +- .../qpid/server/queue/SortedQueueEntryList.java | 6 +- .../server/queue/SortedQueueEntryListFactory.java | 2 +- .../apache/qpid/server/queue/SortedQueueImpl.java | 89 ++++ .../apache/qpid/server/queue/StandardQueue.java | 12 +- .../qpid/server/queue/StandardQueueEntryList.java | 4 +- .../qpid/server/queue/StandardQueueImpl.java | 34 ++ .../server/virtualhost/AbstractVirtualHost.java | 19 +- .../qpid/server/queue/AMQQueueFactoryTest.java | 6 +- .../qpid/server/queue/AbstractQueueTestBase.java | 2 +- .../qpid/server/queue/ConflationQueueListTest.java | 226 -------- .../qpid/server/queue/LastValueQueueListTest.java | 227 ++++++++ .../qpid/server/queue/PriorityQueueListTest.java | 12 +- .../qpid/server/queue/PriorityQueueTest.java | 14 +- .../qpid/server/queue/QueueEntryImplTestBase.java | 2 +- .../queue/SelfValidatingSortedQueueEntryList.java | 4 +- .../server/queue/SimpleQueueEntryImplTest.java | 2 +- .../server/queue/SortedQueueEntryListTest.java | 27 +- .../qpid/server/queue/SortedQueueEntryTest.java | 14 +- .../server/queue/StandardQueueEntryListTest.java | 8 +- .../qpid/server/queue/StandardQueueTest.java | 6 +- .../src/main/java/resources/addQueue.html | 8 +- .../java/resources/js/qpid/management/Queue.js | 8 +- .../java/resources/js/qpid/management/addQueue.js | 6 +- .../src/main/java/resources/showQueue.html | 2 +- .../apache/qpid/server/jmx/mbeans/QueueMBean.java | 2 +- .../qpid/server/jmx/mbeans/QueueMBeanTest.java | 2 +- .../qpid/server/queue/ConflationQueueTest.java | 574 --------------------- .../qpid/server/queue/LastValueQueueTest.java | 574 +++++++++++++++++++++ .../server/store/VirtualHostMessageStoreTest.java | 20 +- .../management/jmx/QueueManagementTest.java | 4 +- .../java/org/apache/qpid/systest/rest/Asserts.java | 13 +- .../qpid/systest/rest/VirtualHostRestTest.java | 35 +- 46 files changed, 1547 insertions(+), 1439 deletions(-) delete mode 100644 qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/ConflationQueue.java delete mode 100644 qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java create mode 100644 qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueue.java create mode 100644 qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueueImpl.java create mode 100644 qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueueList.java create mode 100644 qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueImpl.java create mode 100644 qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueImpl.java create mode 100644 qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueueImpl.java delete mode 100644 qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/ConflationQueueListTest.java create mode 100644 qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/LastValueQueueListTest.java delete mode 100644 qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ConflationQueueTest.java create mode 100644 qpid/java/systests/src/main/java/org/apache/qpid/server/queue/LastValueQueueTest.java diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java index eff15c4905..fed8862782 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java @@ -24,8 +24,7 @@ import java.util.Collection; import org.apache.qpid.server.queue.QueueEntryVisitor; -@ManagedObject - +@ManagedObject( defaultType = "standard" ) public interface Queue> extends ConfiguredObject { @@ -39,22 +38,15 @@ public interface Queue> extends ConfiguredObject String MESSAGE_GROUP_KEY = "messageGroupKey"; String MESSAGE_GROUP_SHARED_GROUPS = "messageGroupSharedGroups"; String MESSAGE_GROUP_DEFAULT_GROUP = "messageGroupDefaultGroup"; - String LVQ_KEY = "lvqKey"; String MAXIMUM_DELIVERY_ATTEMPTS = "maximumDeliveryAttempts"; String NO_LOCAL = "noLocal"; String OWNER = "owner"; String QUEUE_FLOW_CONTROL_SIZE_BYTES = "queueFlowControlSizeBytes"; String QUEUE_FLOW_RESUME_SIZE_BYTES = "queueFlowResumeSizeBytes"; String QUEUE_FLOW_STOPPED = "queueFlowStopped"; - String SORT_KEY = "sortKey"; - String QUEUE_TYPE = "queueType"; - String PRIORITIES = "priorities"; String CREATE_DLQ_ON_CREATION = "x-qpid-dlq-enabled"; // TODO - this value should change - @ManagedAttribute - String getQueueType(); - @ManagedAttribute Exchange getAlternateExchange(); @@ -67,11 +59,6 @@ public interface Queue> extends ConfiguredObject @ManagedAttribute boolean getNoLocal(); - @ManagedAttribute - String getLvqKey(); - - @ManagedAttribute - String getSortKey(); @ManagedAttribute String getMessageGroupKey(); @@ -135,8 +122,6 @@ public interface Queue> extends ConfiguredObject @ManagedAttribute( automate = true, defaultValue = "${queue.alertRepeatGap}") long getAlertRepeatGap(); - @ManagedAttribute - int getPriorities(); //children Collection getBindings(); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java index 9a06569397..9a051be324 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java @@ -82,21 +82,21 @@ public class AMQQueueFactory implements QueueFactory AMQQueue queue; - if(attributes.containsKey(Queue.SORT_KEY)) + if(attributes.containsKey(SortedQueue.SORT_KEY)) { - queue = new SortedQueue(_virtualHost, attributes); + queue = new SortedQueueImpl(_virtualHost, attributes); } - else if(attributes.containsKey(Queue.LVQ_KEY)) + else if(attributes.containsKey(LastValueQueue.LVQ_KEY)) { - queue = new ConflationQueue(_virtualHost, attributes); + queue = new LastValueQueueImpl(_virtualHost, attributes); } - else if(attributes.containsKey(Queue.PRIORITIES)) + else if(attributes.containsKey(PriorityQueue.PRIORITIES)) { - queue = new PriorityQueue(_virtualHost, attributes); + queue = new PriorityQueueImpl(_virtualHost, attributes); } else { - queue = new StandardQueue(_virtualHost, attributes); + queue = new StandardQueueImpl(_virtualHost, attributes); } queue.open(); //Register the new queue diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java index 7738281034..c015bd6d7c 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java @@ -89,9 +89,9 @@ import org.apache.qpid.server.util.ServerScopedRuntimeException; import org.apache.qpid.server.util.StateChangeListener; import org.apache.qpid.server.virtualhost.VirtualHostImpl; -public abstract class AbstractQueue - extends AbstractConfiguredObject - implements AMQQueue, +public abstract class AbstractQueue> + extends AbstractConfiguredObject + implements AMQQueue, StateChangeListener, State>, MessageGroupManager.ConsumerResetHelper { @@ -543,40 +543,10 @@ public abstract class AbstractQueue //We only return the boolean value if message groups are actually in use return _arguments.get(MESSAGE_GROUP_KEY) == null ? null : _arguments.get(MESSAGE_GROUP_SHARED_GROUPS); } - else if(LVQ_KEY.equals(name)) - { - if(this instanceof ConflationQueue) - { - return ((ConflationQueue)this).getConflationKey(); - } - } else if(QUEUE_FLOW_STOPPED.equals(name)) { return isOverfull(); } - else if(SORT_KEY.equals(name)) - { - if(this instanceof SortedQueue) - { - return ((SortedQueue)this).getSortedPropertyName(); - } - } - else if(QUEUE_TYPE.equals(name)) - { - if(this instanceof SortedQueue) - { - return "sorted"; - } - if(this instanceof ConflationQueue) - { - return "lvq"; - } - if(this instanceof PriorityQueue) - { - return "priority"; - } - return "standard"; - } else if(STATE.equals(name)) { return State.ACTIVE; // TODO @@ -585,13 +555,6 @@ public abstract class AbstractQueue { return getDescription(); } - else if(PRIORITIES.equals(name)) - { - if(this instanceof PriorityQueue) - { - return ((PriorityQueue)this).getPriorities(); - } - } return super.getAttribute(name); } @@ -2678,12 +2641,6 @@ public abstract class AbstractQueue return false; } - @Override - public String getQueueType() - { - return null; - } - @Override public ExclusivityPolicy getExclusive() { @@ -2696,18 +2653,6 @@ public abstract class AbstractQueue return _noLocal; } - @Override - public String getLvqKey() - { - return null; - } - - @Override - public String getSortKey() - { - return null; - } - @Override public String getMessageGroupKey() { @@ -2727,13 +2672,6 @@ public abstract class AbstractQueue return false; } - @Override - public int getPriorities() - { - return 0; - } - - @Override public State getState() { diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/ConflationQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/ConflationQueue.java deleted file mode 100644 index 9b5b7b23cb..0000000000 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/ConflationQueue.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.qpid.server.queue; - -import java.util.Map; - -import org.apache.qpid.server.model.Queue; -import org.apache.qpid.server.util.MapValueConverter; -import org.apache.qpid.server.virtualhost.VirtualHostImpl; - -public class ConflationQueue extends AbstractQueue -{ - public static final String DEFAULT_LVQ_KEY = "qpid.LVQ_key"; - - - protected ConflationQueue(VirtualHostImpl virtualHost, - Map attributes) - { - super(virtualHost, attributes, entryList(attributes)); - } - - private static ConflationQueueList.Factory entryList(final Map attributes) - { - - String conflationKey = MapValueConverter.getStringAttribute(Queue.LVQ_KEY, - attributes, - DEFAULT_LVQ_KEY); - - // conflation key can still be null if it was present in the map with a null value - return new ConflationQueueList.Factory(conflationKey == null ? DEFAULT_LVQ_KEY : conflationKey); - } - - public String getConflationKey() - { - return ((ConflationQueueList)getEntries()).getConflationKey(); - } - -} diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java deleted file mode 100644 index c234b9cbb3..0000000000 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java +++ /dev/null @@ -1,272 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.qpid.server.queue; - -import org.apache.qpid.server.message.ServerMessage; -import org.apache.qpid.server.txn.AutoCommitTransaction; -import org.apache.qpid.server.txn.ServerTransaction; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Collections; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicReference; - -public class ConflationQueueList extends OrderedQueueEntryList -{ - private static final Logger LOGGER = LoggerFactory.getLogger(ConflationQueueList.class); - - private static final HeadCreator HEAD_CREATOR = new HeadCreator() - { - - @Override - public ConflationQueueEntry createHead(final QueueEntryList list) - { - return ((ConflationQueueList)list).createHead(); - } - }; - - private final String _conflationKey; - private final ConcurrentHashMap> _latestValuesMap = - new ConcurrentHashMap>(); - - private final ConflationQueueEntry _deleteInProgress = new ConflationQueueEntry(this); - private final ConflationQueueEntry _newerEntryAlreadyBeenAndGone = new ConflationQueueEntry(this); - - public ConflationQueueList(ConflationQueue queue, String conflationKey) - { - super(queue, HEAD_CREATOR); - _conflationKey = conflationKey; - } - - private ConflationQueueEntry createHead() - { - return new ConflationQueueEntry(this); - } - - public String getConflationKey() - { - return _conflationKey; - } - - @Override - protected ConflationQueueEntry createQueueEntry(ServerMessage message) - { - return new ConflationQueueEntry(this, message); - } - - - - /** - * Updates the list using super.add and also updates {@link #_latestValuesMap} and discards entries as necessary. - */ - @Override - public ConflationQueueEntry add(final ServerMessage message) - { - final ConflationQueueEntry addedEntry = (ConflationQueueEntry) super.add(message); - - final Object keyValue = message.getMessageHeader().getHeader(_conflationKey); - if (keyValue != null) - { - if(LOGGER.isDebugEnabled()) - { - LOGGER.debug("Adding entry " + addedEntry + " for message " + message.getMessageNumber() + " with conflation key " + keyValue); - } - - final AtomicReference referenceToEntry = new AtomicReference(addedEntry); - AtomicReference entryReferenceFromMap; - ConflationQueueEntry entryFromMap; - - // Iterate until we have got a valid atomic reference object and either the referent is newer than the current - // entry, or the current entry has replaced it in the reference. Note that the _deletedEntryPlaceholder is a special value - // indicating that the reference object is no longer valid (it is being removed from the map). - boolean keepTryingToUpdateEntryReference; - do - { - do - { - entryReferenceFromMap = getOrPutIfAbsent(keyValue, referenceToEntry); - - // entryFromMap can be either an older entry, a newer entry (added recently by another thread), or addedEntry (if it's for a new key value) - entryFromMap = entryReferenceFromMap.get(); - } - while(entryFromMap == _deleteInProgress); - - boolean entryFromMapIsOlder = entryFromMap != _newerEntryAlreadyBeenAndGone && entryFromMap.compareTo(addedEntry) < 0; - - keepTryingToUpdateEntryReference = entryFromMapIsOlder - && !entryReferenceFromMap.compareAndSet(entryFromMap, addedEntry); - } - while(keepTryingToUpdateEntryReference); - - if (entryFromMap == _newerEntryAlreadyBeenAndGone) - { - discardEntry(addedEntry); - } - else if (entryFromMap.compareTo(addedEntry) > 0) - { - if(LOGGER.isDebugEnabled()) - { - LOGGER.debug("New entry " + addedEntry.getEntryId() + " for message " + addedEntry.getMessage().getMessageNumber() + " being immediately discarded because a newer entry arrived. The newer entry is: " + entryFromMap + " for message " + entryFromMap.getMessage().getMessageNumber()); - } - discardEntry(addedEntry); - } - else if (entryFromMap.compareTo(addedEntry) < 0) - { - if(LOGGER.isDebugEnabled()) - { - LOGGER.debug("Entry " + addedEntry + " for message " + addedEntry.getMessage().getMessageNumber() + " replacing older entry " + entryFromMap + " for message " + entryFromMap.getMessage().getMessageNumber()); - } - discardEntry(entryFromMap); - } - - addedEntry.setLatestValueReference(entryReferenceFromMap); - } - - return addedEntry; - } - - /** - * Returns: - * - *
    - *
  • the existing entry reference if the value already exists in the map, or
  • - *
  • referenceToValue if none exists, or
  • - *
  • a reference to {@link #_newerEntryAlreadyBeenAndGone} if another thread concurrently - * adds and removes during execution of this method.
  • - *
- */ - private AtomicReference getOrPutIfAbsent(final Object key, final AtomicReference referenceToAddedValue) - { - AtomicReference latestValueReference = _latestValuesMap.putIfAbsent(key, referenceToAddedValue); - - if(latestValueReference == null) - { - latestValueReference = _latestValuesMap.get(key); - if(latestValueReference == null) - { - return new AtomicReference(_newerEntryAlreadyBeenAndGone); - } - } - return latestValueReference; - } - - private void discardEntry(final QueueEntry entry) - { - if(entry.acquire()) - { - ServerTransaction txn = new AutoCommitTransaction(getQueue().getVirtualHost().getMessageStore()); - txn.dequeue(entry.getQueue(),entry.getMessage(), - new ServerTransaction.Action() - { - @Override - public void postCommit() - { - entry.delete(); - } - - @Override - public void onRollback() - { - - } - }); - } - } - - final class ConflationQueueEntry extends OrderedQueueEntry - { - - private AtomicReference _latestValueReference; - - private ConflationQueueEntry(final ConflationQueueList queueEntryList) - { - super(queueEntryList); - } - - public ConflationQueueEntry(ConflationQueueList queueEntryList, ServerMessage message) - { - super(queueEntryList, message); - } - - @Override - public void release() - { - super.release(); - - discardIfReleasedEntryIsNoLongerLatest(); - } - - @Override - protected void onDelete() - { - if(_latestValueReference != null && _latestValueReference.compareAndSet(this, _deleteInProgress)) - { - Object key = getMessage().getMessageHeader().getHeader(_conflationKey); - _latestValuesMap.remove(key,_latestValueReference); - } - - } - - public void setLatestValueReference(final AtomicReference latestValueReference) - { - _latestValueReference = latestValueReference; - } - - private void discardIfReleasedEntryIsNoLongerLatest() - { - if(_latestValueReference != null) - { - if(_latestValueReference.get() != this) - { - discardEntry(this); - } - } - } - - } - - /** - * Exposed purposes of unit test only. - */ - Map> getLatestValuesMap() - { - return Collections.unmodifiableMap(_latestValuesMap); - } - - static class Factory implements QueueEntryListFactory - { - private final String _conflationKey; - - Factory(String conflationKey) - { - _conflationKey = conflationKey; - } - - @Override - public ConflationQueueList createQueueEntryList(final AMQQueue queue) - { - return new ConflationQueueList((ConflationQueue)queue, _conflationKey); - } - } -} diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueue.java new file mode 100644 index 0000000000..c97a789f65 --- /dev/null +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueue.java @@ -0,0 +1,33 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.queue; + +import org.apache.qpid.server.model.ManagedAttribute; +import org.apache.qpid.server.model.ManagedObject; + +@ManagedObject( category = false, type="lvq" ) +public interface LastValueQueue> extends AMQQueue +{ + String LVQ_KEY = "lvqKey"; + + @ManagedAttribute + String getLvqKey(); +} diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueueImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueueImpl.java new file mode 100644 index 0000000000..9ee9125bcf --- /dev/null +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueueImpl.java @@ -0,0 +1,74 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.server.queue; + +import java.util.Map; + +import org.apache.qpid.server.util.MapValueConverter; +import org.apache.qpid.server.virtualhost.VirtualHostImpl; + +public class LastValueQueueImpl extends AbstractQueue implements LastValueQueue +{ + public static final String DEFAULT_LVQ_KEY = "qpid.LVQ_key"; + + + protected LastValueQueueImpl(VirtualHostImpl virtualHost, + Map attributes) + { + super(virtualHost, attributes, entryList(attributes)); + } + + private static LastValueQueueList.Factory entryList(final Map attributes) + { + + String conflationKey = MapValueConverter.getStringAttribute(LVQ_KEY, + attributes, + DEFAULT_LVQ_KEY); + + // conflation key can still be null if it was present in the map with a null value + return new LastValueQueueList.Factory(conflationKey == null ? DEFAULT_LVQ_KEY : conflationKey); + } + + public String getConflationKey() + { + return ((LastValueQueueList)getEntries()).getConflationKey(); + } + + @Override + public Object getAttribute(final String name) + { + if(LVQ_KEY.equals(name)) + { + if(this instanceof LastValueQueueImpl) + { + return getConflationKey(); + } + } + return super.getAttribute(name); + } + + @Override + public String getLvqKey() + { + return getConflationKey(); + } +} diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueueList.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueueList.java new file mode 100644 index 0000000000..cd586e9629 --- /dev/null +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueueList.java @@ -0,0 +1,273 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.server.queue; + +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicReference; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.txn.AutoCommitTransaction; +import org.apache.qpid.server.txn.ServerTransaction; + +public class LastValueQueueList extends OrderedQueueEntryList +{ + private static final Logger LOGGER = LoggerFactory.getLogger(LastValueQueueList.class); + + private static final HeadCreator HEAD_CREATOR = new HeadCreator() + { + + @Override + public ConflationQueueEntry createHead(final QueueEntryList list) + { + return ((LastValueQueueList)list).createHead(); + } + }; + + private final String _conflationKey; + private final ConcurrentHashMap> _latestValuesMap = + new ConcurrentHashMap>(); + + private final ConflationQueueEntry _deleteInProgress = new ConflationQueueEntry(this); + private final ConflationQueueEntry _newerEntryAlreadyBeenAndGone = new ConflationQueueEntry(this); + + public LastValueQueueList(LastValueQueueImpl queue, String conflationKey) + { + super(queue, HEAD_CREATOR); + _conflationKey = conflationKey; + } + + private ConflationQueueEntry createHead() + { + return new ConflationQueueEntry(this); + } + + public String getConflationKey() + { + return _conflationKey; + } + + @Override + protected ConflationQueueEntry createQueueEntry(ServerMessage message) + { + return new ConflationQueueEntry(this, message); + } + + + + /** + * Updates the list using super.add and also updates {@link #_latestValuesMap} and discards entries as necessary. + */ + @Override + public ConflationQueueEntry add(final ServerMessage message) + { + final ConflationQueueEntry addedEntry = (ConflationQueueEntry) super.add(message); + + final Object keyValue = message.getMessageHeader().getHeader(_conflationKey); + if (keyValue != null) + { + if(LOGGER.isDebugEnabled()) + { + LOGGER.debug("Adding entry " + addedEntry + " for message " + message.getMessageNumber() + " with conflation key " + keyValue); + } + + final AtomicReference referenceToEntry = new AtomicReference(addedEntry); + AtomicReference entryReferenceFromMap; + ConflationQueueEntry entryFromMap; + + // Iterate until we have got a valid atomic reference object and either the referent is newer than the current + // entry, or the current entry has replaced it in the reference. Note that the _deletedEntryPlaceholder is a special value + // indicating that the reference object is no longer valid (it is being removed from the map). + boolean keepTryingToUpdateEntryReference; + do + { + do + { + entryReferenceFromMap = getOrPutIfAbsent(keyValue, referenceToEntry); + + // entryFromMap can be either an older entry, a newer entry (added recently by another thread), or addedEntry (if it's for a new key value) + entryFromMap = entryReferenceFromMap.get(); + } + while(entryFromMap == _deleteInProgress); + + boolean entryFromMapIsOlder = entryFromMap != _newerEntryAlreadyBeenAndGone && entryFromMap.compareTo(addedEntry) < 0; + + keepTryingToUpdateEntryReference = entryFromMapIsOlder + && !entryReferenceFromMap.compareAndSet(entryFromMap, addedEntry); + } + while(keepTryingToUpdateEntryReference); + + if (entryFromMap == _newerEntryAlreadyBeenAndGone) + { + discardEntry(addedEntry); + } + else if (entryFromMap.compareTo(addedEntry) > 0) + { + if(LOGGER.isDebugEnabled()) + { + LOGGER.debug("New entry " + addedEntry.getEntryId() + " for message " + addedEntry.getMessage().getMessageNumber() + " being immediately discarded because a newer entry arrived. The newer entry is: " + entryFromMap + " for message " + entryFromMap.getMessage().getMessageNumber()); + } + discardEntry(addedEntry); + } + else if (entryFromMap.compareTo(addedEntry) < 0) + { + if(LOGGER.isDebugEnabled()) + { + LOGGER.debug("Entry " + addedEntry + " for message " + addedEntry.getMessage().getMessageNumber() + " replacing older entry " + entryFromMap + " for message " + entryFromMap.getMessage().getMessageNumber()); + } + discardEntry(entryFromMap); + } + + addedEntry.setLatestValueReference(entryReferenceFromMap); + } + + return addedEntry; + } + + /** + * Returns: + * + *
    + *
  • the existing entry reference if the value already exists in the map, or
  • + *
  • referenceToValue if none exists, or
  • + *
  • a reference to {@link #_newerEntryAlreadyBeenAndGone} if another thread concurrently + * adds and removes during execution of this method.
  • + *
+ */ + private AtomicReference getOrPutIfAbsent(final Object key, final AtomicReference referenceToAddedValue) + { + AtomicReference latestValueReference = _latestValuesMap.putIfAbsent(key, referenceToAddedValue); + + if(latestValueReference == null) + { + latestValueReference = _latestValuesMap.get(key); + if(latestValueReference == null) + { + return new AtomicReference(_newerEntryAlreadyBeenAndGone); + } + } + return latestValueReference; + } + + private void discardEntry(final QueueEntry entry) + { + if(entry.acquire()) + { + ServerTransaction txn = new AutoCommitTransaction(getQueue().getVirtualHost().getMessageStore()); + txn.dequeue(entry.getQueue(),entry.getMessage(), + new ServerTransaction.Action() + { + @Override + public void postCommit() + { + entry.delete(); + } + + @Override + public void onRollback() + { + + } + }); + } + } + + final class ConflationQueueEntry extends OrderedQueueEntry + { + + private AtomicReference _latestValueReference; + + private ConflationQueueEntry(final LastValueQueueList queueEntryList) + { + super(queueEntryList); + } + + public ConflationQueueEntry(LastValueQueueList queueEntryList, ServerMessage message) + { + super(queueEntryList, message); + } + + @Override + public void release() + { + super.release(); + + discardIfReleasedEntryIsNoLongerLatest(); + } + + @Override + protected void onDelete() + { + if(_latestValueReference != null && _latestValueReference.compareAndSet(this, _deleteInProgress)) + { + Object key = getMessage().getMessageHeader().getHeader(_conflationKey); + _latestValuesMap.remove(key,_latestValueReference); + } + + } + + public void setLatestValueReference(final AtomicReference latestValueReference) + { + _latestValueReference = latestValueReference; + } + + private void discardIfReleasedEntryIsNoLongerLatest() + { + if(_latestValueReference != null) + { + if(_latestValueReference.get() != this) + { + discardEntry(this); + } + } + } + + } + + /** + * Exposed purposes of unit test only. + */ + Map> getLatestValuesMap() + { + return Collections.unmodifiableMap(_latestValuesMap); + } + + static class Factory implements QueueEntryListFactory + { + private final String _conflationKey; + + Factory(String conflationKey) + { + _conflationKey = conflationKey; + } + + @Override + public LastValueQueueList createQueueEntryList(final AMQQueue queue) + { + return new LastValueQueueList((LastValueQueueImpl)queue, _conflationKey); + } + } +} diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java index 3ecf134b5f..0797bbd4e9 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java @@ -20,11 +20,11 @@ */ package org.apache.qpid.server.queue; -import org.apache.qpid.server.virtualhost.VirtualHostImpl; - import java.util.Map; -public abstract class OutOfOrderQueue extends AbstractQueue +import org.apache.qpid.server.virtualhost.VirtualHostImpl; + +public abstract class OutOfOrderQueue> extends AbstractQueue { protected OutOfOrderQueue(VirtualHostImpl virtualHost, diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueue.java index 04e94f24f6..8e8732d595 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueue.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueue.java @@ -1,52 +1,33 @@ /* -* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -* -*/ + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ package org.apache.qpid.server.queue; -import org.apache.qpid.server.model.Queue; -import org.apache.qpid.server.util.MapValueConverter; -import org.apache.qpid.server.virtualhost.VirtualHostImpl; +import org.apache.qpid.server.model.ManagedAttribute; +import org.apache.qpid.server.model.ManagedObject; -import java.util.Map; - -public class PriorityQueue extends OutOfOrderQueue +@ManagedObject( category = false, type="priority" ) +public interface PriorityQueue> extends AMQQueue { + String PRIORITIES = "priorities"; - public static final int DEFAULT_PRIORITY_LEVELS = 10; - - protected PriorityQueue(VirtualHostImpl virtualHost, - Map attributes) - { - super(virtualHost, attributes, entryList(attributes)); - } - - private static PriorityQueueList.Factory entryList(final Map attributes) - { - final Integer priorities = MapValueConverter.getIntegerAttribute(Queue.PRIORITIES, attributes, - DEFAULT_PRIORITY_LEVELS); - - return new PriorityQueueList.Factory(priorities); - } - - public int getPriorities() - { - return getEntries().getPriorities(); - } + @ManagedAttribute + int getPriorities(); } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueImpl.java new file mode 100644 index 0000000000..4f73bea8e6 --- /dev/null +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueImpl.java @@ -0,0 +1,64 @@ +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ +package org.apache.qpid.server.queue; + +import java.util.Map; + +import org.apache.qpid.server.util.MapValueConverter; +import org.apache.qpid.server.virtualhost.VirtualHostImpl; + +public class PriorityQueueImpl extends OutOfOrderQueue implements PriorityQueue +{ + + public static final int DEFAULT_PRIORITY_LEVELS = 10; + + protected PriorityQueueImpl(VirtualHostImpl virtualHost, + Map attributes) + { + super(virtualHost, attributes, entryList(attributes)); + } + + private static PriorityQueueList.Factory entryList(final Map attributes) + { + final Integer priorities = MapValueConverter.getIntegerAttribute(PRIORITIES, attributes, + DEFAULT_PRIORITY_LEVELS); + + return new PriorityQueueList.Factory(priorities); + } + + @Override + public int getPriorities() + { + return getEntries().getPriorities(); + } + + @Override + public Object getAttribute(final String name) + { + + if(PRIORITIES.equals(name)) + { + return getPriorities(); + } + + return super.getAttribute(name); + } +} diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java index d47b7ce2cd..f72c8cd57a 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java @@ -26,7 +26,7 @@ abstract public class PriorityQueueList extends OrderedQueueEntryList { - public PriorityQueueList(final PriorityQueue queue, + public PriorityQueueList(final PriorityQueueImpl queue, final HeadCreator headCreator) { super(queue, headCreator); @@ -43,12 +43,12 @@ abstract public class PriorityQueueList extends OrderedQueueEntryList return null; } }; - private final PriorityQueue _queue; + private final PriorityQueueImpl _queue; private final PriorityQueueEntrySubList[] _priorityLists; private final int _priorities; private final int _priorityOffset; - public PriorityQueueMasterList(PriorityQueue queue, int priorities) + public PriorityQueueMasterList(PriorityQueueImpl queue, int priorities) { super(queue, DUMMY_HEAD_CREATOR); _queue = queue; @@ -67,7 +67,7 @@ abstract public class PriorityQueueList extends OrderedQueueEntryList } @Override - public PriorityQueue getQueue() + public PriorityQueueImpl getQueue() { return _queue; } @@ -196,7 +196,7 @@ abstract public class PriorityQueueList extends OrderedQueueEntryList public PriorityQueueList createQueueEntryList(AMQQueue queue) { - return new PriorityQueueMasterList((PriorityQueue) queue, _priorities); + return new PriorityQueueMasterList((PriorityQueueImpl) queue, _priorities); } } @@ -212,7 +212,7 @@ abstract public class PriorityQueueList extends OrderedQueueEntryList }; private int _listPriority; - public PriorityQueueEntrySubList(PriorityQueue queue, int listPriority) + public PriorityQueueEntrySubList(PriorityQueueImpl queue, int listPriority) { super(queue, HEAD_CREATOR); _listPriority = listPriority; diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java index 757db35af9..c1687de86e 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java @@ -78,9 +78,9 @@ public class QueueArgumentsConverter ATTRIBUTE_MAPPINGS.put(X_QPID_CAPACITY, Queue.QUEUE_FLOW_CONTROL_SIZE_BYTES); ATTRIBUTE_MAPPINGS.put(X_QPID_FLOW_RESUME_CAPACITY, Queue.QUEUE_FLOW_RESUME_SIZE_BYTES); - ATTRIBUTE_MAPPINGS.put(QPID_QUEUE_SORT_KEY, Queue.SORT_KEY); - ATTRIBUTE_MAPPINGS.put(QPID_LAST_VALUE_QUEUE_KEY, Queue.LVQ_KEY); - ATTRIBUTE_MAPPINGS.put(X_QPID_PRIORITIES, Queue.PRIORITIES); + ATTRIBUTE_MAPPINGS.put(QPID_QUEUE_SORT_KEY, SortedQueue.SORT_KEY); + ATTRIBUTE_MAPPINGS.put(QPID_LAST_VALUE_QUEUE_KEY, LastValueQueue.LVQ_KEY); + ATTRIBUTE_MAPPINGS.put(X_QPID_PRIORITIES, PriorityQueue.PRIORITIES); ATTRIBUTE_MAPPINGS.put(X_QPID_DESCRIPTION, Queue.DESCRIPTION); @@ -108,7 +108,7 @@ public class QueueArgumentsConverter } if(wireArguments.containsKey(QPID_LAST_VALUE_QUEUE) && !wireArguments.containsKey(QPID_LAST_VALUE_QUEUE_KEY)) { - modelArguments.put(Queue.LVQ_KEY, ConflationQueue.DEFAULT_LVQ_KEY); + modelArguments.put(LastValueQueue.LVQ_KEY, LastValueQueueImpl.DEFAULT_LVQ_KEY); } if(wireArguments.containsKey(QPID_SHARED_MSG_GROUP)) { diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java index 5f7ea14889..19f9f6c427 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java @@ -1,4 +1,5 @@ /* + * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -19,54 +20,16 @@ */ package org.apache.qpid.server.queue; -import org.apache.qpid.server.message.MessageInstance; -import org.apache.qpid.server.message.ServerMessage; -import org.apache.qpid.server.model.Queue; -import org.apache.qpid.server.util.Action; -import org.apache.qpid.server.util.MapValueConverter; -import org.apache.qpid.server.virtualhost.VirtualHostImpl; - -import java.util.Map; +import org.apache.qpid.server.model.ManagedAttribute; +import org.apache.qpid.server.model.ManagedObject; -public class SortedQueue extends OutOfOrderQueue +@ManagedObject( category = false, type="sorted" ) +public interface SortedQueue> extends AMQQueue { - //Lock object to synchronize enqueue. Used instead of the object - //monitor to prevent lock order issues with consumer sendLocks - //and consumer updates in the super classes - private final Object _sortedQueueLock = new Object(); - private final String _sortedPropertyName; - - protected SortedQueue(VirtualHostImpl virtualHost, - Map attributes, - QueueEntryListFactory factory) - { - super(virtualHost, attributes, factory); - _sortedPropertyName = MapValueConverter.getStringAttribute(Queue.SORT_KEY,attributes); - } - - - protected SortedQueue(VirtualHostImpl virtualHost, - Map attributes) - { - this(virtualHost, - attributes, - new SortedQueueEntryListFactory(MapValueConverter.getStringAttribute(Queue.SORT_KEY, attributes))); - } - + String SORT_KEY = "sortKey"; + @ManagedAttribute + String getSortKey(); - public String getSortedPropertyName() - { - return _sortedPropertyName; - } - @Override - public void enqueue(final ServerMessage message, - final Action action) - { - synchronized (_sortedQueueLock) - { - super.enqueue(message, action); - } - } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java index 990ca76d67..92abe30442 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java @@ -36,17 +36,17 @@ public class SortedQueueEntryList implements QueueEntryList private SortedQueueEntry _root; private long _entryId = Long.MIN_VALUE; private final Object _lock = new Object(); - private final SortedQueue _queue; + private final SortedQueueImpl _queue; private final String _propertyName; - public SortedQueueEntryList(final SortedQueue queue, final String propertyName) + public SortedQueueEntryList(final SortedQueueImpl queue, final String propertyName) { _queue = queue; _head = new SortedQueueEntry(this); _propertyName = propertyName; } - public SortedQueue getQueue() + public SortedQueueImpl getQueue() { return _queue; } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryListFactory.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryListFactory.java index ed80b33234..5cab126374 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryListFactory.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryListFactory.java @@ -32,6 +32,6 @@ public class SortedQueueEntryListFactory implements QueueEntryListFactory @Override public SortedQueueEntryList createQueueEntryList(final AMQQueue queue) { - return new SortedQueueEntryList((SortedQueue) queue, _propertyName); + return new SortedQueueEntryList((SortedQueueImpl) queue, _propertyName); } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueImpl.java new file mode 100644 index 0000000000..3115f6f581 --- /dev/null +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueImpl.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.queue; + +import java.util.Map; + +import org.apache.qpid.server.message.MessageInstance; +import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.util.Action; +import org.apache.qpid.server.util.MapValueConverter; +import org.apache.qpid.server.virtualhost.VirtualHostImpl; + +public class SortedQueueImpl extends OutOfOrderQueue implements SortedQueue +{ + //Lock object to synchronize enqueue. Used instead of the object + //monitor to prevent lock order issues with consumer sendLocks + //and consumer updates in the super classes + private final Object _sortedQueueLock = new Object(); + private final String _sortedPropertyName; + + protected SortedQueueImpl(VirtualHostImpl virtualHost, + Map attributes, + QueueEntryListFactory factory) + { + super(virtualHost, attributes, factory); + _sortedPropertyName = MapValueConverter.getStringAttribute(SORT_KEY,attributes); + } + + + protected SortedQueueImpl(VirtualHostImpl virtualHost, + Map attributes) + { + this(virtualHost, + attributes, + new SortedQueueEntryListFactory(MapValueConverter.getStringAttribute(SORT_KEY, attributes))); + } + + + + public String getSortedPropertyName() + { + return _sortedPropertyName; + } + + @Override + public void enqueue(final ServerMessage message, + final Action action) + { + synchronized (_sortedQueueLock) + { + super.enqueue(message, action); + } + } + + @Override + public Object getAttribute(final String name) + { + + if(SORT_KEY.equals(name)) + { + return getSortedPropertyName(); + } + + return super.getAttribute(name); + } + + @Override + public String getSortKey() + { + return getSortedPropertyName(); + } +} diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueue.java index 44578375e0..8d040064a1 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueue.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueue.java @@ -20,15 +20,9 @@ */ package org.apache.qpid.server.queue; -import java.util.Map; +import org.apache.qpid.server.model.ManagedObject; -import org.apache.qpid.server.virtualhost.VirtualHostImpl; - -public class StandardQueue extends AbstractQueue +@ManagedObject( category = false, type="standard" ) +public interface StandardQueue> extends AMQQueue { - public StandardQueue(final VirtualHostImpl virtualHost, - final Map arguments) - { - super(virtualHost, arguments, new StandardQueueEntryList.Factory()); - } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueueEntryList.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueueEntryList.java index 0b8b7f7c6b..341e2365c3 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueueEntryList.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueueEntryList.java @@ -34,7 +34,7 @@ public class StandardQueueEntryList extends OrderedQueueEntryList } }; - public StandardQueueEntryList(final StandardQueue queue) + public StandardQueueEntryList(final StandardQueueImpl queue) { super(queue, HEAD_CREATOR); } @@ -50,7 +50,7 @@ public class StandardQueueEntryList extends OrderedQueueEntryList public StandardQueueEntryList createQueueEntryList(AMQQueue queue) { - return new StandardQueueEntryList((StandardQueue) queue); + return new StandardQueueEntryList((StandardQueueImpl) queue); } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueueImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueueImpl.java new file mode 100644 index 0000000000..4dc57b14a7 --- /dev/null +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueueImpl.java @@ -0,0 +1,34 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.queue; + +import java.util.Map; + +import org.apache.qpid.server.virtualhost.VirtualHostImpl; + +public class StandardQueueImpl extends AbstractQueue implements StandardQueue +{ + public StandardQueueImpl(final VirtualHostImpl virtualHost, + final Map arguments) + { + super(virtualHost, arguments, new StandardQueueEntryList.Factory()); + } +} diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java index c49f626242..b4a54aa5cb 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java @@ -70,9 +70,12 @@ import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.protocol.LinkRegistry; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.AMQQueueFactory; -import org.apache.qpid.server.queue.ConflationQueue; +import org.apache.qpid.server.queue.LastValueQueue; +import org.apache.qpid.server.queue.LastValueQueueImpl; import org.apache.qpid.server.queue.DefaultQueueRegistry; +import org.apache.qpid.server.queue.PriorityQueue; import org.apache.qpid.server.queue.QueueRegistry; +import org.apache.qpid.server.queue.SortedQueue; import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.security.access.Operation; import org.apache.qpid.server.stats.StatisticsCounter; @@ -627,9 +630,9 @@ public abstract class AbstractVirtualHost> exte // make a copy as we may augment (with an ID for example) attributes = new LinkedHashMap(attributes); - if (attributes.containsKey(Queue.QUEUE_TYPE)) + if (attributes.containsKey(Queue.TYPE)) { - String typeAttribute = MapValueConverter.getStringAttribute(Queue.QUEUE_TYPE, attributes, null); + String typeAttribute = MapValueConverter.getStringAttribute(Queue.TYPE, attributes, null); QueueType queueType = null; try { @@ -639,15 +642,15 @@ public abstract class AbstractVirtualHost> exte { throw new IllegalArgumentException("Unsupported queue type :" + typeAttribute); } - if (queueType == QueueType.LVQ && attributes.get(Queue.LVQ_KEY) == null) + if (queueType == QueueType.LVQ && attributes.get(LastValueQueue.LVQ_KEY) == null) { - attributes.put(Queue.LVQ_KEY, ConflationQueue.DEFAULT_LVQ_KEY); + attributes.put(LastValueQueue.LVQ_KEY, LastValueQueueImpl.DEFAULT_LVQ_KEY); } - else if (queueType == QueueType.PRIORITY && attributes.get(Queue.PRIORITIES) == null) + else if (queueType == QueueType.PRIORITY && attributes.get(PriorityQueue.PRIORITIES) == null) { - attributes.put(Queue.PRIORITIES, 10); + attributes.put(PriorityQueue.PRIORITIES, 10); } - else if (queueType == QueueType.SORTED && attributes.get(Queue.SORT_KEY) == null) + else if (queueType == QueueType.SORTED && attributes.get(SortedQueue.SORT_KEY) == null) { throw new IllegalArgumentException("Sort key is not specified for sorted queue"); } diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java index ed2de21b6b..8ec78e952e 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java @@ -195,12 +195,12 @@ public class AMQQueueFactoryTest extends QpidTestCase attributes.put(Queue.ID, UUID.randomUUID()); attributes.put(Queue.NAME, "testPriorityQueue"); - attributes.put(Queue.PRIORITIES, 5); + attributes.put(PriorityQueue.PRIORITIES, 5); AMQQueue queue = _queueFactory.createQueue(attributes); - assertEquals("Queue not a priority queue", PriorityQueue.class, queue.getClass()); + assertEquals("Queue not a priority queue", PriorityQueueImpl.class, queue.getClass()); verifyQueueRegistered("testPriorityQueue"); verifyRegisteredQueueCount(1); } @@ -217,7 +217,7 @@ public class AMQQueueFactoryTest extends QpidTestCase AMQQueue queue = _queueFactory.createQueue(attributes); - assertEquals("Queue not a simple queue", StandardQueue.class, queue.getClass()); + assertEquals("Queue not a simple queue", StandardQueueImpl.class, queue.getClass()); verifyQueueRegistered(queueName); //verify that no alternate exchange or DLQ were produced diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java index eb0ab8633e..c62b541191 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java @@ -1105,7 +1105,7 @@ abstract class AbstractQueueTestBase extends QpidTestCase } - private static class NonAsyncDeliverQueue extends AbstractQueue + private static class NonAsyncDeliverQueue extends AbstractQueue { public NonAsyncDeliverQueue(final TestSimpleQueueEntryListFactory factory, VirtualHostImpl vhost) { diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/ConflationQueueListTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/ConflationQueueListTest.java deleted file mode 100644 index 5c049b7d3f..0000000000 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/ConflationQueueListTest.java +++ /dev/null @@ -1,226 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.queue; - -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; -import junit.framework.TestCase; - -import org.apache.qpid.server.logging.EventLogger; -import org.apache.qpid.server.message.MessageReference; -import org.apache.qpid.server.message.AMQMessageHeader; -import org.apache.qpid.server.message.ServerMessage; -import org.apache.qpid.server.model.Queue; -import org.apache.qpid.server.security.SecurityManager; -import org.apache.qpid.server.virtualhost.VirtualHostImpl; - -import java.util.HashMap; -import java.util.Map; -import java.util.UUID; - -public class ConflationQueueListTest extends TestCase -{ - private static final String CONFLATION_KEY = "CONFLATION_KEY"; - - private static final String TEST_KEY_VALUE = "testKeyValue"; - private static final String TEST_KEY_VALUE1 = "testKeyValue1"; - private static final String TEST_KEY_VALUE2 = "testKeyValue2"; - - private ConflationQueueList _list; - private ConflationQueue _queue; - - @Override - protected void setUp() throws Exception - { - super.setUp(); - Map queueAttributes = new HashMap(); - queueAttributes.put(Queue.ID, UUID.randomUUID()); - queueAttributes.put(Queue.NAME, getName()); - queueAttributes.put(Queue.LVQ_KEY, CONFLATION_KEY); - final VirtualHostImpl virtualHost = mock(VirtualHostImpl.class); - when(virtualHost.getSecurityManager()).thenReturn(mock(SecurityManager.class)); - when(virtualHost.getEventLogger()).thenReturn(new EventLogger()); - _queue = new ConflationQueue(virtualHost, queueAttributes); - _list = (ConflationQueueList) _queue.getEntries(); - } - - public void testListHasNoEntries() - { - int numberOfEntries = countEntries(_list); - assertEquals(0, numberOfEntries); - } - - public void testAddMessageWithoutConflationKeyValue() - { - ServerMessage message = createTestServerMessage(null); - - _list.add(message); - int numberOfEntries = countEntries(_list); - assertEquals(1, numberOfEntries); - } - - public void testAddAndDiscardMessageWithoutConflationKeyValue() - { - ServerMessage message = createTestServerMessage(null); - - QueueEntry addedEntry = _list.add(message); - addedEntry.delete(); - - int numberOfEntries = countEntries(_list); - assertEquals(0, numberOfEntries); - } - - public void testAddMessageWithConflationKeyValue() - { - ServerMessage message = createTestServerMessage(TEST_KEY_VALUE); - - _list.add(message); - int numberOfEntries = countEntries(_list); - assertEquals(1, numberOfEntries); - } - - public void testAddAndRemoveMessageWithConflationKeyValue() - { - ServerMessage message = createTestServerMessage(TEST_KEY_VALUE); - - QueueEntry addedEntry = _list.add(message); - addedEntry.delete(); - - int numberOfEntries = countEntries(_list); - assertEquals(0, numberOfEntries); - } - - public void testAddTwoMessagesWithDifferentConflationKeyValue() - { - ServerMessage message1 = createTestServerMessage(TEST_KEY_VALUE1); - ServerMessage message2 = createTestServerMessage(TEST_KEY_VALUE2); - - _list.add(message1); - _list.add(message2); - - int numberOfEntries = countEntries(_list); - assertEquals(2, numberOfEntries); - } - - public void testAddTwoMessagesWithSameConflationKeyValue() - { - ServerMessage message1 = createTestServerMessage(TEST_KEY_VALUE); - ServerMessage message2 = createTestServerMessage(TEST_KEY_VALUE); - - _list.add(message1); - _list.add(message2); - - int numberOfEntries = countEntries(_list); - assertEquals(1, numberOfEntries); - } - - public void testSupersededEntryIsDiscardedOnRelease() - { - ServerMessage message1 = createTestServerMessage(TEST_KEY_VALUE); - ServerMessage message2 = createTestServerMessage(TEST_KEY_VALUE); - - QueueEntry entry1 = _list.add(message1); - entry1.acquire(); // simulate an in-progress delivery to consumer - - _list.add(message2); - assertFalse(entry1.isDeleted()); - - assertEquals(2, countEntries(_list)); - - entry1.release(); // simulate consumer rollback/recover - - assertEquals(1, countEntries(_list)); - assertTrue(entry1.isDeleted()); - } - - public void testConflationMapMaintained() - { - assertEquals(0, _list.getLatestValuesMap().size()); - - ServerMessage message = createTestServerMessage(TEST_KEY_VALUE); - - QueueEntry addedEntry = _list.add(message); - - assertEquals(1, countEntries(_list)); - assertEquals(1, _list.getLatestValuesMap().size()); - - addedEntry.delete(); - - assertEquals(0, countEntries(_list)); - assertEquals(0, _list.getLatestValuesMap().size()); - } - - public void testConflationMapMaintainedWithDifferentConflationKeyValue() - { - - assertEquals(0, _list.getLatestValuesMap().size()); - - ServerMessage message1 = createTestServerMessage(TEST_KEY_VALUE1); - ServerMessage message2 = createTestServerMessage(TEST_KEY_VALUE2); - - QueueEntry addedEntry1 = _list.add(message1); - QueueEntry addedEntry2 = _list.add(message2); - - assertEquals(2, countEntries(_list)); - assertEquals(2, _list.getLatestValuesMap().size()); - - addedEntry1.delete(); - addedEntry2.delete(); - - assertEquals(0, countEntries(_list)); - assertEquals(0, _list.getLatestValuesMap().size()); - } - - private int countEntries(ConflationQueueList list) - { - QueueEntryIterator iterator = - list.iterator(); - int count = 0; - while(iterator.advance()) - { - count++; - } - return count; - } - - private ServerMessage createTestServerMessage(String conflationKeyValue) - { - ServerMessage mockMessage = mock(ServerMessage.class); - - AMQMessageHeader messageHeader = mock(AMQMessageHeader.class); - when(messageHeader.getHeader(CONFLATION_KEY)).thenReturn(conflationKeyValue); - when(mockMessage.getMessageHeader()).thenReturn(messageHeader); - - MessageReference messageReference = mock(MessageReference.class); - when(mockMessage.newReference()).thenReturn(messageReference); - when(messageReference.getMessage()).thenReturn(mockMessage); - - return mockMessage; - } - - private AMQQueue createTestQueue() - { - AMQQueue queue = mock(AMQQueue.class); - VirtualHostImpl virtualHost = mock(VirtualHostImpl.class); - when(queue.getVirtualHost()).thenReturn(virtualHost); - - return queue; - } -} diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/LastValueQueueListTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/LastValueQueueListTest.java new file mode 100644 index 0000000000..bc1d89a280 --- /dev/null +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/LastValueQueueListTest.java @@ -0,0 +1,227 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.queue; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; + +import junit.framework.TestCase; + +import org.apache.qpid.server.logging.EventLogger; +import org.apache.qpid.server.message.AMQMessageHeader; +import org.apache.qpid.server.message.MessageReference; +import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.model.Queue; +import org.apache.qpid.server.security.SecurityManager; +import org.apache.qpid.server.virtualhost.VirtualHostImpl; + +public class LastValueQueueListTest extends TestCase +{ + private static final String CONFLATION_KEY = "CONFLATION_KEY"; + + private static final String TEST_KEY_VALUE = "testKeyValue"; + private static final String TEST_KEY_VALUE1 = "testKeyValue1"; + private static final String TEST_KEY_VALUE2 = "testKeyValue2"; + + private LastValueQueueList _list; + private LastValueQueueImpl _queue; + + @Override + protected void setUp() throws Exception + { + super.setUp(); + Map queueAttributes = new HashMap(); + queueAttributes.put(Queue.ID, UUID.randomUUID()); + queueAttributes.put(Queue.NAME, getName()); + queueAttributes.put(LastValueQueue.LVQ_KEY, CONFLATION_KEY); + final VirtualHostImpl virtualHost = mock(VirtualHostImpl.class); + when(virtualHost.getSecurityManager()).thenReturn(mock(SecurityManager.class)); + when(virtualHost.getEventLogger()).thenReturn(new EventLogger()); + _queue = new LastValueQueueImpl(virtualHost, queueAttributes); + _list = (LastValueQueueList) _queue.getEntries(); + } + + public void testListHasNoEntries() + { + int numberOfEntries = countEntries(_list); + assertEquals(0, numberOfEntries); + } + + public void testAddMessageWithoutConflationKeyValue() + { + ServerMessage message = createTestServerMessage(null); + + _list.add(message); + int numberOfEntries = countEntries(_list); + assertEquals(1, numberOfEntries); + } + + public void testAddAndDiscardMessageWithoutConflationKeyValue() + { + ServerMessage message = createTestServerMessage(null); + + QueueEntry addedEntry = _list.add(message); + addedEntry.delete(); + + int numberOfEntries = countEntries(_list); + assertEquals(0, numberOfEntries); + } + + public void testAddMessageWithConflationKeyValue() + { + ServerMessage message = createTestServerMessage(TEST_KEY_VALUE); + + _list.add(message); + int numberOfEntries = countEntries(_list); + assertEquals(1, numberOfEntries); + } + + public void testAddAndRemoveMessageWithConflationKeyValue() + { + ServerMessage message = createTestServerMessage(TEST_KEY_VALUE); + + QueueEntry addedEntry = _list.add(message); + addedEntry.delete(); + + int numberOfEntries = countEntries(_list); + assertEquals(0, numberOfEntries); + } + + public void testAddTwoMessagesWithDifferentConflationKeyValue() + { + ServerMessage message1 = createTestServerMessage(TEST_KEY_VALUE1); + ServerMessage message2 = createTestServerMessage(TEST_KEY_VALUE2); + + _list.add(message1); + _list.add(message2); + + int numberOfEntries = countEntries(_list); + assertEquals(2, numberOfEntries); + } + + public void testAddTwoMessagesWithSameConflationKeyValue() + { + ServerMessage message1 = createTestServerMessage(TEST_KEY_VALUE); + ServerMessage message2 = createTestServerMessage(TEST_KEY_VALUE); + + _list.add(message1); + _list.add(message2); + + int numberOfEntries = countEntries(_list); + assertEquals(1, numberOfEntries); + } + + public void testSupersededEntryIsDiscardedOnRelease() + { + ServerMessage message1 = createTestServerMessage(TEST_KEY_VALUE); + ServerMessage message2 = createTestServerMessage(TEST_KEY_VALUE); + + QueueEntry entry1 = _list.add(message1); + entry1.acquire(); // simulate an in-progress delivery to consumer + + _list.add(message2); + assertFalse(entry1.isDeleted()); + + assertEquals(2, countEntries(_list)); + + entry1.release(); // simulate consumer rollback/recover + + assertEquals(1, countEntries(_list)); + assertTrue(entry1.isDeleted()); + } + + public void testConflationMapMaintained() + { + assertEquals(0, _list.getLatestValuesMap().size()); + + ServerMessage message = createTestServerMessage(TEST_KEY_VALUE); + + QueueEntry addedEntry = _list.add(message); + + assertEquals(1, countEntries(_list)); + assertEquals(1, _list.getLatestValuesMap().size()); + + addedEntry.delete(); + + assertEquals(0, countEntries(_list)); + assertEquals(0, _list.getLatestValuesMap().size()); + } + + public void testConflationMapMaintainedWithDifferentConflationKeyValue() + { + + assertEquals(0, _list.getLatestValuesMap().size()); + + ServerMessage message1 = createTestServerMessage(TEST_KEY_VALUE1); + ServerMessage message2 = createTestServerMessage(TEST_KEY_VALUE2); + + QueueEntry addedEntry1 = _list.add(message1); + QueueEntry addedEntry2 = _list.add(message2); + + assertEquals(2, countEntries(_list)); + assertEquals(2, _list.getLatestValuesMap().size()); + + addedEntry1.delete(); + addedEntry2.delete(); + + assertEquals(0, countEntries(_list)); + assertEquals(0, _list.getLatestValuesMap().size()); + } + + private int countEntries(LastValueQueueList list) + { + QueueEntryIterator iterator = + list.iterator(); + int count = 0; + while(iterator.advance()) + { + count++; + } + return count; + } + + private ServerMessage createTestServerMessage(String conflationKeyValue) + { + ServerMessage mockMessage = mock(ServerMessage.class); + + AMQMessageHeader messageHeader = mock(AMQMessageHeader.class); + when(messageHeader.getHeader(CONFLATION_KEY)).thenReturn(conflationKeyValue); + when(mockMessage.getMessageHeader()).thenReturn(messageHeader); + + MessageReference messageReference = mock(MessageReference.class); + when(mockMessage.newReference()).thenReturn(messageReference); + when(messageReference.getMessage()).thenReturn(mockMessage); + + return mockMessage; + } + + private AMQQueue createTestQueue() + { + AMQQueue queue = mock(AMQQueue.class); + VirtualHostImpl virtualHost = mock(VirtualHostImpl.class); + when(queue.getVirtualHost()).thenReturn(virtualHost); + + return queue; + } +} diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueListTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueListTest.java index 50cf4f44aa..ad677a98a7 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueListTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueListTest.java @@ -23,6 +23,10 @@ package org.apache.qpid.server.queue; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; + import org.apache.qpid.server.logging.EventLogger; import org.apache.qpid.server.message.AMQMessageHeader; import org.apache.qpid.server.message.MessageReference; @@ -33,10 +37,6 @@ import org.apache.qpid.server.util.BrokerTestHelper; import org.apache.qpid.server.virtualhost.VirtualHostImpl; import org.apache.qpid.test.utils.QpidTestCase; -import java.util.HashMap; -import java.util.Map; -import java.util.UUID; - public class PriorityQueueListTest extends QpidTestCase { private static final byte[] PRIORITIES = {4, 5, 5, 4}; @@ -54,11 +54,11 @@ public class PriorityQueueListTest extends QpidTestCase Map queueAttributes = new HashMap(); queueAttributes.put(Queue.ID, UUID.randomUUID()); queueAttributes.put(Queue.NAME, getName()); - queueAttributes.put(Queue.PRIORITIES, 10); + queueAttributes.put(PriorityQueue.PRIORITIES, 10); final VirtualHostImpl virtualHost = mock(VirtualHostImpl.class); when(virtualHost.getSecurityManager()).thenReturn(mock(SecurityManager.class)); when(virtualHost.getEventLogger()).thenReturn(new EventLogger()); - PriorityQueue queue = new PriorityQueue(virtualHost, queueAttributes); + PriorityQueueImpl queue = new PriorityQueueImpl(virtualHost, queueAttributes); _list = (PriorityQueueList) queue.getEntries(); for (int i = 0; i < PRIORITIES.length; i++) diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueTest.java index 09cec61909..fdc0411f95 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueTest.java @@ -20,7 +20,12 @@ */ package org.apache.qpid.server.queue; +import static org.mockito.Mockito.when; + +import java.util.ArrayList; import java.util.Collections; +import java.util.EnumSet; + import junit.framework.AssertionFailedError; import org.apache.qpid.server.consumer.ConsumerImpl; @@ -28,20 +33,13 @@ import org.apache.qpid.server.message.AMQMessageHeader; import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.ServerMessage; -import java.util.ArrayList; -import java.util.EnumSet; - -import org.apache.qpid.server.model.Queue; - -import static org.mockito.Mockito.when; - public class PriorityQueueTest extends AbstractQueueTestBase { @Override public void setUp() throws Exception { - setArguments(Collections.singletonMap(Queue.PRIORITIES,(Object)3)); + setArguments(Collections.singletonMap(PriorityQueue.PRIORITIES,(Object)3)); super.setUp(); } diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java index ef2a8ebccf..a5a25994ca 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java @@ -199,7 +199,7 @@ public abstract class QueueEntryImplTestBase extends TestCase when(virtualHost.getSecurityManager()).thenReturn(mock(org.apache.qpid.server.security.SecurityManager.class)); when(virtualHost.getEventLogger()).thenReturn(new EventLogger()); - StandardQueue queue = new StandardQueue(virtualHost, queueAttributes); + StandardQueueImpl queue = new StandardQueueImpl(virtualHost, queueAttributes); OrderedQueueEntryList queueEntryList = (OrderedQueueEntryList) queue.getEntries(); // create test entries diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SelfValidatingSortedQueueEntryList.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SelfValidatingSortedQueueEntryList.java index 0ea4ee930d..dc4609734d 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SelfValidatingSortedQueueEntryList.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SelfValidatingSortedQueueEntryList.java @@ -30,13 +30,13 @@ import org.apache.qpid.server.queue.SortedQueueEntry.Colour; */ public class SelfValidatingSortedQueueEntryList extends SortedQueueEntryList { - public SelfValidatingSortedQueueEntryList(SortedQueue queue, String propertyName) + public SelfValidatingSortedQueueEntryList(SortedQueueImpl queue, String propertyName) { super(queue, propertyName); } @Override - public SortedQueue getQueue() + public SortedQueueImpl getQueue() { return super.getQueue(); } diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java index cb2e9bf062..61e396ac27 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java @@ -47,7 +47,7 @@ public class SimpleQueueEntryImplTest extends QueueEntryImplTestBase final VirtualHostImpl virtualHost = mock(VirtualHostImpl.class); when(virtualHost.getSecurityManager()).thenReturn(mock(org.apache.qpid.server.security.SecurityManager.class)); when(virtualHost.getEventLogger()).thenReturn(new EventLogger()); - StandardQueue queue = new StandardQueue(virtualHost, queueAttributes); + StandardQueueImpl queue = new StandardQueueImpl(virtualHost, queueAttributes); queueEntryList = (OrderedQueueEntryList) queue.getEntries(); diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java index 0334b9d93b..bcae391b92 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java @@ -19,7 +19,15 @@ */ package org.apache.qpid.server.queue; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; import org.apache.qpid.server.logging.EventLogger; import org.apache.qpid.server.message.AMQMessageHeader; @@ -30,15 +38,6 @@ import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.virtualhost.VirtualHostImpl; -import java.util.Arrays; -import java.util.HashMap; -import java.util.Map; -import java.util.UUID; - -import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - public class SortedQueueEntryListTest extends QueueEntryListTestBase { private static SelfValidatingSortedQueueEntryList _sqel; @@ -71,7 +70,7 @@ public class SortedQueueEntryListTest extends QueueEntryListTestBase private final static String keysSorted[] = keys.clone(); - private SortedQueue _testQueue; + private SortedQueueImpl _testQueue; @Override protected void setUp() throws Exception @@ -81,19 +80,19 @@ public class SortedQueueEntryListTest extends QueueEntryListTestBase attributes.put(Queue.NAME, getName()); attributes.put(Queue.DURABLE, false); attributes.put(Queue.LIFETIME_POLICY, LifetimePolicy.PERMANENT); - attributes.put(Queue.SORT_KEY, "KEY"); + attributes.put(SortedQueue.SORT_KEY, "KEY"); // Create test list final VirtualHostImpl virtualHost = mock(VirtualHostImpl.class); when(virtualHost.getSecurityManager()).thenReturn(mock(SecurityManager.class)); when(virtualHost.getEventLogger()).thenReturn(new EventLogger()); - _testQueue = new SortedQueue(virtualHost, attributes, new QueueEntryListFactory() + _testQueue = new SortedQueueImpl(virtualHost, attributes, new QueueEntryListFactory() { @Override public SortedQueueEntryList createQueueEntryList(final AMQQueue queue) { - return new SelfValidatingSortedQueueEntryList((SortedQueue) queue, "KEY"); + return new SelfValidatingSortedQueueEntryList((SortedQueueImpl) queue, "KEY"); } }); _sqel = (SelfValidatingSortedQueueEntryList) _testQueue.getEntries(); @@ -149,7 +148,7 @@ public class SortedQueueEntryListTest extends QueueEntryListTestBase } @Override - protected SortedQueue getTestQueue() + protected SortedQueueImpl getTestQueue() { return _testQueue; } diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryTest.java index 2a96f08c53..90c4a82747 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryTest.java @@ -19,6 +19,10 @@ */ package org.apache.qpid.server.queue; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -32,10 +36,6 @@ import org.apache.qpid.server.model.LifetimePolicy; import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.virtualhost.VirtualHostImpl; -import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - public class SortedQueueEntryTest extends QueueEntryImplTestBase { @@ -51,18 +51,18 @@ public class SortedQueueEntryTest extends QueueEntryImplTestBase attributes.put(Queue.NAME, getName()); attributes.put(Queue.DURABLE, false); attributes.put(Queue.LIFETIME_POLICY, LifetimePolicy.PERMANENT); - attributes.put(Queue.SORT_KEY, "KEY"); + attributes.put(SortedQueue.SORT_KEY, "KEY"); final VirtualHostImpl virtualHost = mock(VirtualHostImpl.class); when(virtualHost.getSecurityManager()).thenReturn(mock(org.apache.qpid.server.security.SecurityManager.class)); when(virtualHost.getEventLogger()).thenReturn(new EventLogger()); - SortedQueue queue = new SortedQueue(virtualHost, attributes, new QueueEntryListFactory() + SortedQueueImpl queue = new SortedQueueImpl(virtualHost, attributes, new QueueEntryListFactory() { @Override public SortedQueueEntryList createQueueEntryList(final AMQQueue queue) { - return new SelfValidatingSortedQueueEntryList((SortedQueue) queue, "KEY"); + return new SelfValidatingSortedQueueEntryList((SortedQueueImpl) queue, "KEY"); } }); _queueEntryList = (SelfValidatingSortedQueueEntryList) queue.getEntries(); diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java index 27cbe7dc8d..2bab20a1b0 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java @@ -38,7 +38,7 @@ import static org.mockito.Mockito.when; public class StandardQueueEntryListTest extends QueueEntryListTestBase { - private StandardQueue _testQueue; + private StandardQueueImpl _testQueue; private StandardQueueEntryList _sqel; private static final String SCAVENGE_PROP = "qpid.queue.scavenge_count"; @@ -55,7 +55,7 @@ public class StandardQueueEntryListTest extends QueueEntryListTestBase final VirtualHostImpl virtualHost = mock(VirtualHostImpl.class); when(virtualHost.getSecurityManager()).thenReturn(mock(SecurityManager.class)); when(virtualHost.getEventLogger()).thenReturn(new EventLogger()); - _testQueue = new StandardQueue(virtualHost, queueAttributes); + _testQueue = new StandardQueueImpl(virtualHost, queueAttributes); _sqel = (StandardQueueEntryList) _testQueue.getEntries(); for(int i = 1; i <= 100; i++) @@ -101,7 +101,7 @@ public class StandardQueueEntryListTest extends QueueEntryListTestBase final VirtualHostImpl virtualHost = mock(VirtualHostImpl.class); when(virtualHost.getSecurityManager()).thenReturn(mock(SecurityManager.class)); when(virtualHost.getEventLogger()).thenReturn(new EventLogger()); - StandardQueue queue = new StandardQueue(virtualHost, queueAttributes); + StandardQueueImpl queue = new StandardQueueImpl(virtualHost, queueAttributes); return (StandardQueueEntryList) queue.getEntries(); } @@ -132,7 +132,7 @@ public class StandardQueueEntryListTest extends QueueEntryListTestBase } @Override - protected StandardQueue getTestQueue() + protected StandardQueueImpl getTestQueue() { return _testQueue; } diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java index 37519e7a0b..f6d04175c5 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java @@ -49,7 +49,7 @@ public class StandardQueueTest extends AbstractQueueTestBase queueAttributes.put(Queue.ID, UUID.randomUUID()); queueAttributes.put(Queue.NAME, getQname()); queueAttributes.put(Queue.LIFETIME_POLICY, LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS); - final StandardQueue queue = new StandardQueue(getVirtualHost(), queueAttributes); + final StandardQueueImpl queue = new StandardQueueImpl(getVirtualHost(), queueAttributes); queue.open(); setQueue(queue); @@ -72,7 +72,7 @@ public class StandardQueueTest extends AbstractQueueTestBase queueAttributes.put(Queue.ID, UUID.randomUUID()); queueAttributes.put(Queue.NAME, "testActiveConsumerCount"); queueAttributes.put(Queue.OWNER, "testOwner"); - final StandardQueue queue = new StandardQueue(getVirtualHost(), queueAttributes); + final StandardQueueImpl queue = new StandardQueueImpl(getVirtualHost(), queueAttributes); queue.open(); //verify adding an active consumer increases the count final MockConsumer consumer1 = new MockConsumer(); @@ -180,7 +180,7 @@ public class StandardQueueTest extends AbstractQueueTestBase queueAttributes.put(Queue.ID, UUID.randomUUID()); queueAttributes.put(Queue.NAME, "test"); // create queue with overridden method deliverAsync - StandardQueue testQueue = new StandardQueue(getVirtualHost(), queueAttributes) + StandardQueueImpl testQueue = new StandardQueueImpl(getVirtualHost(), queueAttributes) { @Override public void deliverAsync(QueueConsumer sub) diff --git a/qpid/java/broker-plugins/management-http/src/main/java/resources/addQueue.html b/qpid/java/broker-plugins/management-http/src/main/java/resources/addQueue.html index 410a5d23ca..9a24e23407 100644 --- a/qpid/java/broker-plugins/management-http/src/main/java/resources/addQueue.html +++ b/qpid/java/broker-plugins/management-http/src/main/java/resources/addQueue.html @@ -35,16 +35,16 @@ Queue Type: - +    - +    - +    - + diff --git a/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Queue.js b/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Queue.js index 606f95e349..c1442e5edc 100644 --- a/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Queue.js +++ b/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Queue.js @@ -277,7 +277,7 @@ define(["dojo/_base/xhr", "exclusive", "owner", "lifetimePolicy", - "queueType", + "type", "typeQualifier", "alertRepeatGap", "alertRepeatGapUnits", @@ -359,14 +359,14 @@ define(["dojo/_base/xhr", bytesDepth = formatter.formatBytes( this.queueData["unacknowledgedBytes"] ); this.unacknowledgedBytes.innerHTML = "(" + bytesDepth.value; this.unacknowledgedBytesUnits.innerHTML = bytesDepth.units + ")"; - this.queueType.innerHTML = entities.encode(this.queueData[ "queueType" ]); - if (this.queueData.queueType == "standard") + this["type" ].innerHTML = entities.encode(this.queueData[ "type" ]); + if (this.queueData["type"] == "standard") { this.typeQualifier.style.display = "none"; } else { - this.typeQualifier.innerHTML = entities.encode("(" + queueTypeKeyNames[this.queueData.queueType] + ": " + this.queueData[queueTypeKeys[this.queueData.queueType]] + ")"); + this.typeQualifier.innerHTML = entities.encode("(" + queueTypeKeyNames[this.queueData[ "type" ]] + ": " + this.queueData[queueTypeKeys[this.queueData[ "type" ]]] + ")"); } if(this.queueData["messageGroupKey"]) diff --git a/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/addQueue.js b/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/addQueue.js index 64e821b2dd..64b31309d9 100644 --- a/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/addQueue.js +++ b/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/addQueue.js @@ -96,7 +96,7 @@ define(["dojo/_base/xhr", } } else if (!typeSpecificFields.hasOwnProperty(propName) || - formValues.queueType === typeSpecificFields[ propName ]) { + formValues[ "type" ] === typeSpecificFields[ propName ]) { if(formValues[ propName ] !== "") { if (fieldConverters.hasOwnProperty(propName)) { @@ -130,7 +130,7 @@ define(["dojo/_base/xhr", theForm = registry.byId("formAddQueue"); array.forEach(theForm.getDescendants(), function(widget) { - if(widget.name === "queueType") { + if(widget.name === "type") { widget.on("change", function(isChecked) { var objId = widget.id + ":fields"; @@ -195,4 +195,4 @@ define(["dojo/_base/xhr", }; return addQueue; - }); \ No newline at end of file + }); diff --git a/qpid/java/broker-plugins/management-http/src/main/java/resources/showQueue.html b/qpid/java/broker-plugins/management-http/src/main/java/resources/showQueue.html index 252deb3100..89b7327957 100644 --- a/qpid/java/broker-plugins/management-http/src/main/java/resources/showQueue.html +++ b/qpid/java/broker-plugins/management-http/src/main/java/resources/showQueue.html @@ -47,7 +47,7 @@
Type:
- +
diff --git a/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/QueueMBean.java b/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/QueueMBean.java index 74183eafc5..e95ed9a383 100644 --- a/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/QueueMBean.java +++ b/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/QueueMBean.java @@ -185,7 +185,7 @@ public class QueueMBean extends AMQManagedObject implements ManagedQueue, QueueN @Override public String getQueueType() { - return (String) _queue.getAttribute(Queue.QUEUE_TYPE); + return (String) _queue.getAttribute(Queue.TYPE); } public boolean isDurable() diff --git a/qpid/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/QueueMBeanTest.java b/qpid/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/QueueMBeanTest.java index 2df196eff6..a81de49fb9 100644 --- a/qpid/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/QueueMBeanTest.java +++ b/qpid/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/QueueMBeanTest.java @@ -131,7 +131,7 @@ public class QueueMBeanTest extends QpidTestCase public void testQueueType() throws Exception { - when(_mockQueue.getAttribute(Queue.QUEUE_TYPE)).thenReturn(QUEUE_TYPE); + when(_mockQueue.getAttribute(Queue.TYPE)).thenReturn(QUEUE_TYPE); MBeanTestUtils.assertMBeanAttribute(_queueMBean, "queueType", QUEUE_TYPE); } diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ConflationQueueTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ConflationQueueTest.java deleted file mode 100644 index 0e59e9cceb..0000000000 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ConflationQueueTest.java +++ /dev/null @@ -1,574 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.qpid.server.queue; - -import org.apache.log4j.Logger; -import org.apache.qpid.AMQException; -import org.apache.qpid.client.AMQConnection; -import org.apache.qpid.client.AMQDestination; -import org.apache.qpid.client.AMQQueue; -import org.apache.qpid.client.AMQSession; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.test.utils.QpidBrokerTestCase; -import org.apache.qpid.url.AMQBindingURL; - -import javax.jms.Connection; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.Queue; -import javax.jms.Session; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - -public class ConflationQueueTest extends QpidBrokerTestCase -{ - private static final Logger LOGGER = Logger.getLogger(ConflationQueueTest.class); - - private static final String MESSAGE_SEQUENCE_NUMBER_PROPERTY = "msg"; - private static final String KEY_PROPERTY = "key"; - - private static final int MSG_COUNT = 400; - - private String _queueName; - private Queue _queue; - private Connection _producerConnection; - private MessageProducer _producer; - private Session _producerSession; - private Connection _consumerConnection; - private Session _consumerSession; - private MessageConsumer _consumer; - - protected void setUp() throws Exception - { - super.setUp(); - - _queueName = getTestQueueName(); - _producerConnection = getConnection(); - _producerSession = _producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - } - - public void testConflation() throws Exception - { - _consumerConnection = getConnection(); - _consumerSession = _consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - - createConflationQueue(_producerSession); - _producer = _producerSession.createProducer(_queue); - - for (int msg = 0; msg < MSG_COUNT; msg++) - { - _producer.send(nextMessage(msg, _producerSession)); - } - - _producer.close(); - _producerSession.close(); - _producerConnection.close(); - - _consumer = _consumerSession.createConsumer(_queue); - _consumerConnection.start(); - Message received; - - List messages = new ArrayList(); - while((received = _consumer.receive(1000))!=null) - { - messages.add(received); - } - - assertEquals("Unexpected number of messages received",10,messages.size()); - - for(int i = 0 ; i < 10; i++) - { - Message msg = messages.get(i); - assertEquals("Unexpected message number received", MSG_COUNT - 10 + i, msg.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY)); - } - } - - public void testConflationWithRelease() throws Exception - { - _consumerConnection = getConnection(); - _consumerSession = _consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE); - - - createConflationQueue(_producerSession); - _producer = _producerSession.createProducer(_queue); - - for (int msg = 0; msg < MSG_COUNT/2; msg++) - { - _producer.send(nextMessage(msg, _producerSession)); - - } - - // HACK to do something synchronous - ((AMQSession)_producerSession).sync(); - - _consumer = _consumerSession.createConsumer(_queue); - _consumerConnection.start(); - Message received; - List messages = new ArrayList(); - while((received = _consumer.receive(1000))!=null) - { - messages.add(received); - } - - assertEquals("Unexpected number of messages received",10,messages.size()); - - for(int i = 0 ; i < 10; i++) - { - Message msg = messages.get(i); - assertEquals("Unexpected message number received", MSG_COUNT/2 - 10 + i, msg.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY)); - } - - _consumerSession.close(); - _consumerConnection.close(); - - - _consumerConnection = getConnection(); - _consumerSession = _consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - - - for (int msg = MSG_COUNT/2; msg < MSG_COUNT; msg++) - { - _producer.send(nextMessage(msg, _producerSession)); - } - - - // HACK to do something synchronous - ((AMQSession)_producerSession).sync(); - - _consumer = _consumerSession.createConsumer(_queue); - _consumerConnection.start(); - - messages = new ArrayList(); - while((received = _consumer.receive(1000))!=null) - { - messages.add(received); - } - - assertEquals("Unexpected number of messages received",10,messages.size()); - - for(int i = 0 ; i < 10; i++) - { - Message msg = messages.get(i); - assertEquals("Unexpected message number received", MSG_COUNT - 10 + i, msg.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY)); - } - - } - - - public void testConflationWithReleaseAfterNewPublish() throws Exception - { - _consumerConnection = getConnection(); - _consumerSession = _consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE); - - - createConflationQueue(_producerSession); - _producer = _producerSession.createProducer(_queue); - - for (int msg = 0; msg < MSG_COUNT/2; msg++) - { - _producer.send(nextMessage(msg, _producerSession)); - } - - // HACK to do something synchronous - ((AMQSession)_producerSession).sync(); - - _consumer = _consumerSession.createConsumer(_queue); - _consumerConnection.start(); - Message received; - List messages = new ArrayList(); - while((received = _consumer.receive(1000))!=null) - { - messages.add(received); - } - - assertEquals("Unexpected number of messages received",10,messages.size()); - - for(int i = 0 ; i < 10; i++) - { - Message msg = messages.get(i); - assertEquals("Unexpected message number received", MSG_COUNT/2 - 10 + i, msg.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY)); - } - - _consumer.close(); - - for (int msg = MSG_COUNT/2; msg < MSG_COUNT; msg++) - { - _producer.send(nextMessage(msg, _producerSession)); - } - - // HACK to do something synchronous - ((AMQSession)_producerSession).sync(); - - - // this causes the "old" messages to be released - _consumerSession.close(); - _consumerConnection.close(); - - - _consumerConnection = getConnection(); - _consumerSession = _consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - - - - _consumer = _consumerSession.createConsumer(_queue); - _consumerConnection.start(); - - messages = new ArrayList(); - while((received = _consumer.receive(1000))!=null) - { - messages.add(received); - } - - assertEquals("Unexpected number of messages received",10,messages.size()); - - for(int i = 0 ; i < 10; i++) - { - Message msg = messages.get(i); - assertEquals("Unexpected message number received", MSG_COUNT - 10 + i, msg.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY)); - } - - } - - public void testConflatedQueueDepth() throws Exception - { - _consumerConnection = getConnection(); - _consumerSession = _consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - - createConflationQueue(_producerSession); - _producer = _producerSession.createProducer(_queue); - - for (int msg = 0; msg < MSG_COUNT; msg++) - { - _producer.send(nextMessage(msg, _producerSession)); - } - - final long queueDepth = ((AMQSession)_producerSession).getQueueDepth((AMQDestination)_queue, true); - - assertEquals(10, queueDepth); - } - - public void testConflationBrowser() throws Exception - { - _consumerConnection = getConnection(); - _consumerSession = _consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - - - createConflationQueue(_producerSession); - _producer = _producerSession.createProducer(_queue); - - for (int msg = 0; msg < MSG_COUNT; msg++) - { - _producer.send(nextMessage(msg, _producerSession)); - - } - - ((AMQSession)_producerSession).sync(); - - AMQBindingURL url = new AMQBindingURL("direct://amq.direct//"+_queueName+"?browse='true'&durable='true'"); - AMQQueue browseQueue = new AMQQueue(url); - - _consumer = _consumerSession.createConsumer(browseQueue); - _consumerConnection.start(); - Message received; - List messages = new ArrayList(); - while((received = _consumer.receive(1000))!=null) - { - messages.add(received); - } - - assertEquals("Unexpected number of messages received",10,messages.size()); - - for(int i = 0 ; i < 10; i++) - { - Message msg = messages.get(i); - assertEquals("Unexpected message number received", MSG_COUNT - 10 + i, msg.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY)); - } - - messages.clear(); - - _producer.send(nextMessage(MSG_COUNT, _producerSession)); - - ((AMQSession)_producerSession).sync(); - - while((received = _consumer.receive(1000))!=null) - { - messages.add(received); - } - assertEquals("Unexpected number of messages received",1,messages.size()); - assertEquals("Unexpected message number received", MSG_COUNT, messages.get(0).getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY)); - - - _producer.close(); - _producerSession.close(); - _producerConnection.close(); - } - - public void testConflation2Browsers() throws Exception - { - _consumerConnection = getConnection(); - _consumerSession = _consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - - - createConflationQueue(_producerSession); - _producer = _producerSession.createProducer(_queue); - - for (int msg = 0; msg < MSG_COUNT; msg++) - { - _producer.send(nextMessage(msg, _producerSession)); - } - - ((AMQSession)_producerSession).sync(); - - AMQBindingURL url = new AMQBindingURL("direct://amq.direct//"+_queueName+"?browse='true'&durable='true'"); - AMQQueue browseQueue = new AMQQueue(url); - - _consumer = _consumerSession.createConsumer(browseQueue); - MessageConsumer consumer2 = _consumerSession.createConsumer(browseQueue); - _consumerConnection.start(); - List messages = new ArrayList(); - List messages2 = new ArrayList(); - Message received = _consumer.receive(1000); - Message received2 = consumer2.receive(1000); - - while(received!=null || received2!=null) - { - if(received != null) - { - messages.add(received); - } - if(received2 != null) - { - messages2.add(received2); - } - - - received = _consumer.receive(1000); - received2 = consumer2.receive(1000); - - } - - assertEquals("Unexpected number of messages received on first browser",10,messages.size()); - assertEquals("Unexpected number of messages received on second browser",10,messages2.size()); - - for(int i = 0 ; i < 10; i++) - { - Message msg = messages.get(i); - assertEquals("Unexpected message number received on first browser", MSG_COUNT - 10 + i, msg.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY)); - msg = messages2.get(i); - assertEquals("Unexpected message number received on second browser", MSG_COUNT - 10 + i, msg.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY)); - } - - - _producer.close(); - _producerSession.close(); - _producerConnection.close(); - } - - public void testParallelProductionAndConsumption() throws Exception - { - createConflationQueue(_producerSession); - - // Start producing threads that send messages - BackgroundMessageProducer messageProducer1 = new BackgroundMessageProducer("Message sender1"); - messageProducer1.startSendingMessages(); - BackgroundMessageProducer messageProducer2 = new BackgroundMessageProducer("Message sender2"); - messageProducer2.startSendingMessages(); - - Map lastReceivedMessages = receiveMessages(messageProducer1); - - messageProducer1.join(); - messageProducer2.join(); - - final Map lastSentMessages1 = messageProducer1.getMessageSequenceNumbersByKey(); - assertEquals("Unexpected number of last sent messages sent by producer1", 2, lastSentMessages1.size()); - final Map lastSentMessages2 = messageProducer2.getMessageSequenceNumbersByKey(); - assertEquals(lastSentMessages1, lastSentMessages2); - - assertEquals("The last message sent for each key should match the last message received for that key", - lastSentMessages1, lastReceivedMessages); - - assertNull("Unexpected exception from background producer thread", messageProducer1.getException()); - } - - private Map receiveMessages(BackgroundMessageProducer producer) throws Exception - { - producer.waitUntilQuarterOfMessagesSentToEncourageConflation(); - - _consumerConnection = getConnection(); - int smallPrefetchToEncourageConflation = 1; - _consumerSession = ((AMQConnection)_consumerConnection).createSession(false, Session.AUTO_ACKNOWLEDGE, smallPrefetchToEncourageConflation); - - LOGGER.info("Starting to receive"); - - _consumer = _consumerSession.createConsumer(_queue); - _consumerConnection.start(); - - Map messageSequenceNumbersByKey = new HashMap(); - - Message message; - int numberOfShutdownsReceived = 0; - int numberOfMessagesReceived = 0; - while(numberOfShutdownsReceived < 2) - { - message = _consumer.receive(10000); - assertNotNull(message); - - if (message.propertyExists(BackgroundMessageProducer.SHUTDOWN)) - { - numberOfShutdownsReceived++; - } - else - { - numberOfMessagesReceived++; - putMessageInMap(message, messageSequenceNumbersByKey); - } - } - - LOGGER.info("Finished receiving. Received " + numberOfMessagesReceived + " message(s) in total"); - - return messageSequenceNumbersByKey; - } - - private void putMessageInMap(Message message, Map messageSequenceNumbersByKey) throws JMSException - { - String keyValue = message.getStringProperty(KEY_PROPERTY); - Integer messageSequenceNumber = message.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY); - messageSequenceNumbersByKey.put(keyValue, messageSequenceNumber); - } - - private class BackgroundMessageProducer - { - static final String SHUTDOWN = "SHUTDOWN"; - - private final String _threadName; - - private volatile Exception _exception; - - private Thread _thread; - private Map _messageSequenceNumbersByKey = new HashMap(); - private CountDownLatch _quarterOfMessagesSentLatch = new CountDownLatch(MSG_COUNT/4); - - public BackgroundMessageProducer(String threadName) - { - _threadName = threadName; - } - - public void waitUntilQuarterOfMessagesSentToEncourageConflation() throws InterruptedException - { - final long latchTimeout = 60000; - boolean success = _quarterOfMessagesSentLatch.await(latchTimeout, TimeUnit.MILLISECONDS); - assertTrue("Failed to be notified that 1/4 of the messages have been sent within " + latchTimeout + " ms.", success); - LOGGER.info("Quarter of messages sent"); - } - - public Exception getException() - { - return _exception; - } - - public Map getMessageSequenceNumbersByKey() - { - return Collections.unmodifiableMap(_messageSequenceNumbersByKey); - } - - public void startSendingMessages() - { - Runnable messageSender = new Runnable() - { - @Override - public void run() - { - try - { - LOGGER.info("Starting to send in background thread"); - Connection producerConnection = getConnection(); - Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - - MessageProducer backgroundProducer = producerSession.createProducer(_queue); - for (int messageNumber = 0; messageNumber < MSG_COUNT; messageNumber++) - { - Message message = nextMessage(messageNumber, producerSession, 2); - backgroundProducer.send(message); - - putMessageInMap(message, _messageSequenceNumbersByKey); - _quarterOfMessagesSentLatch.countDown(); - } - - Message shutdownMessage = producerSession.createMessage(); - shutdownMessage.setBooleanProperty(SHUTDOWN, true); - backgroundProducer.send(shutdownMessage); - - LOGGER.info("Finished sending in background thread"); - } - catch (Exception e) - { - _exception = e; - throw new RuntimeException(e); - } - } - }; - - _thread = new Thread(messageSender); - _thread.setName(_threadName); - _thread.start(); - } - - public void join() throws InterruptedException - { - final int timeoutInMillis = 120000; - _thread.join(timeoutInMillis); - assertFalse("Expected producer thread to finish within " + timeoutInMillis + "ms", _thread.isAlive()); - } - } - - private void createConflationQueue(Session session) throws AMQException - { - final Map arguments = new HashMap(); - arguments.put("qpid.last_value_queue_key",KEY_PROPERTY); - ((AMQSession) session).createQueue(new AMQShortString(_queueName), false, true, false, arguments); - _queue = new AMQQueue("amq.direct", _queueName); - ((AMQSession) session).declareAndBind((AMQDestination)_queue); - } - - private Message nextMessage(int msg, Session producerSession) throws JMSException - { - return nextMessage(msg, producerSession, 10); - } - - private Message nextMessage(int msg, Session producerSession, int numberOfUniqueKeyValues) throws JMSException - { - Message send = producerSession.createTextMessage("Message: " + msg); - - send.setStringProperty(KEY_PROPERTY, String.valueOf(msg % numberOfUniqueKeyValues)); - send.setIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY, msg); - - return send; - } -} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/LastValueQueueTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/LastValueQueueTest.java new file mode 100644 index 0000000000..dc30c02951 --- /dev/null +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/LastValueQueueTest.java @@ -0,0 +1,574 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.server.queue; + +import org.apache.log4j.Logger; +import org.apache.qpid.AMQException; +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQDestination; +import org.apache.qpid.client.AMQQueue; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.test.utils.QpidBrokerTestCase; +import org.apache.qpid.url.AMQBindingURL; + +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +public class LastValueQueueTest extends QpidBrokerTestCase +{ + private static final Logger LOGGER = Logger.getLogger(LastValueQueueTest.class); + + private static final String MESSAGE_SEQUENCE_NUMBER_PROPERTY = "msg"; + private static final String KEY_PROPERTY = "key"; + + private static final int MSG_COUNT = 400; + + private String _queueName; + private Queue _queue; + private Connection _producerConnection; + private MessageProducer _producer; + private Session _producerSession; + private Connection _consumerConnection; + private Session _consumerSession; + private MessageConsumer _consumer; + + protected void setUp() throws Exception + { + super.setUp(); + + _queueName = getTestQueueName(); + _producerConnection = getConnection(); + _producerSession = _producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + } + + public void testConflation() throws Exception + { + _consumerConnection = getConnection(); + _consumerSession = _consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + createConflationQueue(_producerSession); + _producer = _producerSession.createProducer(_queue); + + for (int msg = 0; msg < MSG_COUNT; msg++) + { + _producer.send(nextMessage(msg, _producerSession)); + } + + _producer.close(); + _producerSession.close(); + _producerConnection.close(); + + _consumer = _consumerSession.createConsumer(_queue); + _consumerConnection.start(); + Message received; + + List messages = new ArrayList(); + while((received = _consumer.receive(1000))!=null) + { + messages.add(received); + } + + assertEquals("Unexpected number of messages received",10,messages.size()); + + for(int i = 0 ; i < 10; i++) + { + Message msg = messages.get(i); + assertEquals("Unexpected message number received", MSG_COUNT - 10 + i, msg.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY)); + } + } + + public void testConflationWithRelease() throws Exception + { + _consumerConnection = getConnection(); + _consumerSession = _consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + + + createConflationQueue(_producerSession); + _producer = _producerSession.createProducer(_queue); + + for (int msg = 0; msg < MSG_COUNT/2; msg++) + { + _producer.send(nextMessage(msg, _producerSession)); + + } + + // HACK to do something synchronous + ((AMQSession)_producerSession).sync(); + + _consumer = _consumerSession.createConsumer(_queue); + _consumerConnection.start(); + Message received; + List messages = new ArrayList(); + while((received = _consumer.receive(1000))!=null) + { + messages.add(received); + } + + assertEquals("Unexpected number of messages received",10,messages.size()); + + for(int i = 0 ; i < 10; i++) + { + Message msg = messages.get(i); + assertEquals("Unexpected message number received", MSG_COUNT/2 - 10 + i, msg.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY)); + } + + _consumerSession.close(); + _consumerConnection.close(); + + + _consumerConnection = getConnection(); + _consumerSession = _consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + + for (int msg = MSG_COUNT/2; msg < MSG_COUNT; msg++) + { + _producer.send(nextMessage(msg, _producerSession)); + } + + + // HACK to do something synchronous + ((AMQSession)_producerSession).sync(); + + _consumer = _consumerSession.createConsumer(_queue); + _consumerConnection.start(); + + messages = new ArrayList(); + while((received = _consumer.receive(1000))!=null) + { + messages.add(received); + } + + assertEquals("Unexpected number of messages received",10,messages.size()); + + for(int i = 0 ; i < 10; i++) + { + Message msg = messages.get(i); + assertEquals("Unexpected message number received", MSG_COUNT - 10 + i, msg.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY)); + } + + } + + + public void testConflationWithReleaseAfterNewPublish() throws Exception + { + _consumerConnection = getConnection(); + _consumerSession = _consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + + + createConflationQueue(_producerSession); + _producer = _producerSession.createProducer(_queue); + + for (int msg = 0; msg < MSG_COUNT/2; msg++) + { + _producer.send(nextMessage(msg, _producerSession)); + } + + // HACK to do something synchronous + ((AMQSession)_producerSession).sync(); + + _consumer = _consumerSession.createConsumer(_queue); + _consumerConnection.start(); + Message received; + List messages = new ArrayList(); + while((received = _consumer.receive(1000))!=null) + { + messages.add(received); + } + + assertEquals("Unexpected number of messages received",10,messages.size()); + + for(int i = 0 ; i < 10; i++) + { + Message msg = messages.get(i); + assertEquals("Unexpected message number received", MSG_COUNT/2 - 10 + i, msg.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY)); + } + + _consumer.close(); + + for (int msg = MSG_COUNT/2; msg < MSG_COUNT; msg++) + { + _producer.send(nextMessage(msg, _producerSession)); + } + + // HACK to do something synchronous + ((AMQSession)_producerSession).sync(); + + + // this causes the "old" messages to be released + _consumerSession.close(); + _consumerConnection.close(); + + + _consumerConnection = getConnection(); + _consumerSession = _consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + + + _consumer = _consumerSession.createConsumer(_queue); + _consumerConnection.start(); + + messages = new ArrayList(); + while((received = _consumer.receive(1000))!=null) + { + messages.add(received); + } + + assertEquals("Unexpected number of messages received",10,messages.size()); + + for(int i = 0 ; i < 10; i++) + { + Message msg = messages.get(i); + assertEquals("Unexpected message number received", MSG_COUNT - 10 + i, msg.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY)); + } + + } + + public void testConflatedQueueDepth() throws Exception + { + _consumerConnection = getConnection(); + _consumerSession = _consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + createConflationQueue(_producerSession); + _producer = _producerSession.createProducer(_queue); + + for (int msg = 0; msg < MSG_COUNT; msg++) + { + _producer.send(nextMessage(msg, _producerSession)); + } + + final long queueDepth = ((AMQSession)_producerSession).getQueueDepth((AMQDestination)_queue, true); + + assertEquals(10, queueDepth); + } + + public void testConflationBrowser() throws Exception + { + _consumerConnection = getConnection(); + _consumerSession = _consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + + createConflationQueue(_producerSession); + _producer = _producerSession.createProducer(_queue); + + for (int msg = 0; msg < MSG_COUNT; msg++) + { + _producer.send(nextMessage(msg, _producerSession)); + + } + + ((AMQSession)_producerSession).sync(); + + AMQBindingURL url = new AMQBindingURL("direct://amq.direct//"+_queueName+"?browse='true'&durable='true'"); + AMQQueue browseQueue = new AMQQueue(url); + + _consumer = _consumerSession.createConsumer(browseQueue); + _consumerConnection.start(); + Message received; + List messages = new ArrayList(); + while((received = _consumer.receive(1000))!=null) + { + messages.add(received); + } + + assertEquals("Unexpected number of messages received",10,messages.size()); + + for(int i = 0 ; i < 10; i++) + { + Message msg = messages.get(i); + assertEquals("Unexpected message number received", MSG_COUNT - 10 + i, msg.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY)); + } + + messages.clear(); + + _producer.send(nextMessage(MSG_COUNT, _producerSession)); + + ((AMQSession)_producerSession).sync(); + + while((received = _consumer.receive(1000))!=null) + { + messages.add(received); + } + assertEquals("Unexpected number of messages received",1,messages.size()); + assertEquals("Unexpected message number received", MSG_COUNT, messages.get(0).getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY)); + + + _producer.close(); + _producerSession.close(); + _producerConnection.close(); + } + + public void testConflation2Browsers() throws Exception + { + _consumerConnection = getConnection(); + _consumerSession = _consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + + createConflationQueue(_producerSession); + _producer = _producerSession.createProducer(_queue); + + for (int msg = 0; msg < MSG_COUNT; msg++) + { + _producer.send(nextMessage(msg, _producerSession)); + } + + ((AMQSession)_producerSession).sync(); + + AMQBindingURL url = new AMQBindingURL("direct://amq.direct//"+_queueName+"?browse='true'&durable='true'"); + AMQQueue browseQueue = new AMQQueue(url); + + _consumer = _consumerSession.createConsumer(browseQueue); + MessageConsumer consumer2 = _consumerSession.createConsumer(browseQueue); + _consumerConnection.start(); + List messages = new ArrayList(); + List messages2 = new ArrayList(); + Message received = _consumer.receive(1000); + Message received2 = consumer2.receive(1000); + + while(received!=null || received2!=null) + { + if(received != null) + { + messages.add(received); + } + if(received2 != null) + { + messages2.add(received2); + } + + + received = _consumer.receive(1000); + received2 = consumer2.receive(1000); + + } + + assertEquals("Unexpected number of messages received on first browser",10,messages.size()); + assertEquals("Unexpected number of messages received on second browser",10,messages2.size()); + + for(int i = 0 ; i < 10; i++) + { + Message msg = messages.get(i); + assertEquals("Unexpected message number received on first browser", MSG_COUNT - 10 + i, msg.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY)); + msg = messages2.get(i); + assertEquals("Unexpected message number received on second browser", MSG_COUNT - 10 + i, msg.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY)); + } + + + _producer.close(); + _producerSession.close(); + _producerConnection.close(); + } + + public void testParallelProductionAndConsumption() throws Exception + { + createConflationQueue(_producerSession); + + // Start producing threads that send messages + BackgroundMessageProducer messageProducer1 = new BackgroundMessageProducer("Message sender1"); + messageProducer1.startSendingMessages(); + BackgroundMessageProducer messageProducer2 = new BackgroundMessageProducer("Message sender2"); + messageProducer2.startSendingMessages(); + + Map lastReceivedMessages = receiveMessages(messageProducer1); + + messageProducer1.join(); + messageProducer2.join(); + + final Map lastSentMessages1 = messageProducer1.getMessageSequenceNumbersByKey(); + assertEquals("Unexpected number of last sent messages sent by producer1", 2, lastSentMessages1.size()); + final Map lastSentMessages2 = messageProducer2.getMessageSequenceNumbersByKey(); + assertEquals(lastSentMessages1, lastSentMessages2); + + assertEquals("The last message sent for each key should match the last message received for that key", + lastSentMessages1, lastReceivedMessages); + + assertNull("Unexpected exception from background producer thread", messageProducer1.getException()); + } + + private Map receiveMessages(BackgroundMessageProducer producer) throws Exception + { + producer.waitUntilQuarterOfMessagesSentToEncourageConflation(); + + _consumerConnection = getConnection(); + int smallPrefetchToEncourageConflation = 1; + _consumerSession = ((AMQConnection)_consumerConnection).createSession(false, Session.AUTO_ACKNOWLEDGE, smallPrefetchToEncourageConflation); + + LOGGER.info("Starting to receive"); + + _consumer = _consumerSession.createConsumer(_queue); + _consumerConnection.start(); + + Map messageSequenceNumbersByKey = new HashMap(); + + Message message; + int numberOfShutdownsReceived = 0; + int numberOfMessagesReceived = 0; + while(numberOfShutdownsReceived < 2) + { + message = _consumer.receive(10000); + assertNotNull(message); + + if (message.propertyExists(BackgroundMessageProducer.SHUTDOWN)) + { + numberOfShutdownsReceived++; + } + else + { + numberOfMessagesReceived++; + putMessageInMap(message, messageSequenceNumbersByKey); + } + } + + LOGGER.info("Finished receiving. Received " + numberOfMessagesReceived + " message(s) in total"); + + return messageSequenceNumbersByKey; + } + + private void putMessageInMap(Message message, Map messageSequenceNumbersByKey) throws JMSException + { + String keyValue = message.getStringProperty(KEY_PROPERTY); + Integer messageSequenceNumber = message.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY); + messageSequenceNumbersByKey.put(keyValue, messageSequenceNumber); + } + + private class BackgroundMessageProducer + { + static final String SHUTDOWN = "SHUTDOWN"; + + private final String _threadName; + + private volatile Exception _exception; + + private Thread _thread; + private Map _messageSequenceNumbersByKey = new HashMap(); + private CountDownLatch _quarterOfMessagesSentLatch = new CountDownLatch(MSG_COUNT/4); + + public BackgroundMessageProducer(String threadName) + { + _threadName = threadName; + } + + public void waitUntilQuarterOfMessagesSentToEncourageConflation() throws InterruptedException + { + final long latchTimeout = 60000; + boolean success = _quarterOfMessagesSentLatch.await(latchTimeout, TimeUnit.MILLISECONDS); + assertTrue("Failed to be notified that 1/4 of the messages have been sent within " + latchTimeout + " ms.", success); + LOGGER.info("Quarter of messages sent"); + } + + public Exception getException() + { + return _exception; + } + + public Map getMessageSequenceNumbersByKey() + { + return Collections.unmodifiableMap(_messageSequenceNumbersByKey); + } + + public void startSendingMessages() + { + Runnable messageSender = new Runnable() + { + @Override + public void run() + { + try + { + LOGGER.info("Starting to send in background thread"); + Connection producerConnection = getConnection(); + Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + MessageProducer backgroundProducer = producerSession.createProducer(_queue); + for (int messageNumber = 0; messageNumber < MSG_COUNT; messageNumber++) + { + Message message = nextMessage(messageNumber, producerSession, 2); + backgroundProducer.send(message); + + putMessageInMap(message, _messageSequenceNumbersByKey); + _quarterOfMessagesSentLatch.countDown(); + } + + Message shutdownMessage = producerSession.createMessage(); + shutdownMessage.setBooleanProperty(SHUTDOWN, true); + backgroundProducer.send(shutdownMessage); + + LOGGER.info("Finished sending in background thread"); + } + catch (Exception e) + { + _exception = e; + throw new RuntimeException(e); + } + } + }; + + _thread = new Thread(messageSender); + _thread.setName(_threadName); + _thread.start(); + } + + public void join() throws InterruptedException + { + final int timeoutInMillis = 120000; + _thread.join(timeoutInMillis); + assertFalse("Expected producer thread to finish within " + timeoutInMillis + "ms", _thread.isAlive()); + } + } + + private void createConflationQueue(Session session) throws AMQException + { + final Map arguments = new HashMap(); + arguments.put("qpid.last_value_queue_key",KEY_PROPERTY); + ((AMQSession) session).createQueue(new AMQShortString(_queueName), false, true, false, arguments); + _queue = new AMQQueue("amq.direct", _queueName); + ((AMQSession) session).declareAndBind((AMQDestination)_queue); + } + + private Message nextMessage(int msg, Session producerSession) throws JMSException + { + return nextMessage(msg, producerSession, 10); + } + + private Message nextMessage(int msg, Session producerSession, int numberOfUniqueKeyValues) throws JMSException + { + Message send = producerSession.createTextMessage("Message: " + msg); + + send.setStringProperty(KEY_PROPERTY, String.valueOf(msg % numberOfUniqueKeyValues)); + send.setIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY, msg); + + return send; + } +} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java index d857bb4ce0..9949102af8 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java @@ -50,9 +50,11 @@ import org.apache.qpid.server.plugin.ExchangeType; import org.apache.qpid.server.protocol.v0_8.AMQMessage; import org.apache.qpid.server.protocol.v0_8.MessageMetaData; import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.queue.ConflationQueue; +import org.apache.qpid.server.queue.LastValueQueue; +import org.apache.qpid.server.queue.LastValueQueueImpl; import org.apache.qpid.server.queue.PriorityQueue; -import org.apache.qpid.server.queue.StandardQueue; +import org.apache.qpid.server.queue.PriorityQueueImpl; +import org.apache.qpid.server.queue.StandardQueueImpl; import org.apache.qpid.server.txn.AutoCommitTransaction; import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.util.BrokerTestHelper; @@ -553,18 +555,18 @@ public class VirtualHostMessageStoreTest extends QpidTestCase if (usePriority) { - assertEquals("Queue is no longer a Priority Queue", PriorityQueue.class, queue.getClass()); + assertEquals("Queue is no longer a Priority Queue", PriorityQueueImpl.class, queue.getClass()); assertEquals("Priority Queue does not have set priorities", - DEFAULT_PRIORTY_LEVEL, ((PriorityQueue) queue).getPriorities()); + DEFAULT_PRIORTY_LEVEL, ((PriorityQueueImpl) queue).getPriorities()); } else if (lastValueQueue) { - assertEquals("Queue is no longer a LastValue Queue", ConflationQueue.class, queue.getClass()); - assertEquals("LastValue Queue Key has changed", LVQ_KEY, ((ConflationQueue) queue).getConflationKey()); + assertEquals("Queue is no longer a LastValue Queue", LastValueQueueImpl.class, queue.getClass()); + assertEquals("LastValue Queue Key has changed", LVQ_KEY, ((LastValueQueueImpl) queue).getConflationKey()); } else { - assertEquals("Queue is not 'simple'", StandardQueue.class, queue.getClass()); + assertEquals("Queue is not 'simple'", StandardQueueImpl.class, queue.getClass()); } assertEquals("Queue owner is not as expected for queue " + queue.getName(), exclusive ? queueOwner : null, queue.getOwner()); @@ -660,12 +662,12 @@ public class VirtualHostMessageStoreTest extends QpidTestCase if (usePriority) { - queueArguments.put(Queue.PRIORITIES, DEFAULT_PRIORTY_LEVEL); + queueArguments.put(PriorityQueue.PRIORITIES, DEFAULT_PRIORTY_LEVEL); } if (lastValueQueue) { - queueArguments.put(Queue.LVQ_KEY, LVQ_KEY); + queueArguments.put(LastValueQueue.LVQ_KEY, LVQ_KEY); } queueArguments.put(Queue.ID, UUIDGenerator.generateRandomUUID()); diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/systest/management/jmx/QueueManagementTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/systest/management/jmx/QueueManagementTest.java index 931974942f..3bd91faa3e 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/systest/management/jmx/QueueManagementTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/systest/management/jmx/QueueManagementTest.java @@ -28,7 +28,7 @@ import org.apache.qpid.management.common.mbeans.ManagedBroker; import org.apache.qpid.management.common.mbeans.ManagedQueue; import org.apache.qpid.server.queue.NotificationCheckTest; import org.apache.qpid.server.queue.QueueArgumentsConverter; -import org.apache.qpid.server.queue.StandardQueue; +import org.apache.qpid.server.queue.StandardQueueImpl; import org.apache.qpid.test.client.destination.AddressBasedDestinationTest; import org.apache.qpid.test.utils.JMXTestUtils; import org.apache.qpid.test.utils.QpidBrokerTestCase; @@ -660,7 +660,7 @@ public class QueueManagementTest extends QpidBrokerTestCase final Object messageGroupKey = "test"; final Map arguments = new HashMap(2); arguments.put(QueueArgumentsConverter.QPID_GROUP_HEADER_KEY, messageGroupKey); - arguments.put(QueueArgumentsConverter.QPID_SHARED_MSG_GROUP, StandardQueue.SHARED_MSG_GROUP_ARG_VALUE); + arguments.put(QueueArgumentsConverter.QPID_SHARED_MSG_GROUP, StandardQueueImpl.SHARED_MSG_GROUP_ARG_VALUE); managedBroker.createNewQueue(queueName, null, true, arguments); final ManagedQueue managedQueue = _jmxUtils.getManagedQueue(queueName); diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/Asserts.java b/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/Asserts.java index a2bb05dd00..47a25dbbec 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/Asserts.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/Asserts.java @@ -43,6 +43,9 @@ import org.apache.qpid.server.model.Port; import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.model.State; import org.apache.qpid.server.model.VirtualHost; +import org.apache.qpid.server.queue.LastValueQueue; +import org.apache.qpid.server.queue.PriorityQueue; +import org.apache.qpid.server.queue.SortedQueue; import org.apache.qpid.test.utils.TestBrokerConfiguration; public class Asserts @@ -113,11 +116,11 @@ public class Asserts Queue.ALTERNATE_EXCHANGE, Queue.OWNER, Queue.NO_LOCAL, - Queue.LVQ_KEY, - Queue.SORT_KEY, + LastValueQueue.LVQ_KEY, + SortedQueue.SORT_KEY, Queue.MESSAGE_GROUP_KEY, Queue.MESSAGE_GROUP_SHARED_GROUPS, - Queue.PRIORITIES, + PriorityQueue.PRIORITIES, ConfiguredObject.CONTEXT); assertEquals("Unexpected value of queue attribute " + Queue.NAME, queueName, queueData.get(Queue.NAME)); @@ -127,9 +130,9 @@ public class Asserts queueData.get(Queue.STATE)); assertEquals("Unexpected value of queue attribute " + Queue.LIFETIME_POLICY, LifetimePolicy.PERMANENT.name(), queueData.get(Queue.LIFETIME_POLICY)); - assertEquals("Unexpected value of queue attribute " + Queue.QUEUE_TYPE, + assertEquals("Unexpected value of queue attribute " + Queue.TYPE, queueType, - queueData.get(Queue.QUEUE_TYPE)); + queueData.get(Queue.TYPE)); if (expectedAttributes == null) { assertEquals("Unexpected value of queue attribute " + Queue.EXCLUSIVE, diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/VirtualHostRestTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/VirtualHostRestTest.java index 421c609e46..4535425ea4 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/VirtualHostRestTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/VirtualHostRestTest.java @@ -37,7 +37,10 @@ import org.apache.qpid.client.AMQConnection; import org.apache.qpid.server.model.Exchange; import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.model.VirtualHost; -import org.apache.qpid.server.queue.ConflationQueue; +import org.apache.qpid.server.queue.LastValueQueue; +import org.apache.qpid.server.queue.LastValueQueueImpl; +import org.apache.qpid.server.queue.PriorityQueue; +import org.apache.qpid.server.queue.SortedQueue; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.virtualhost.StandardVirtualHost; import org.apache.qpid.util.FileUtils; @@ -193,15 +196,15 @@ public class VirtualHostRestTest extends QpidRestTestCase createQueue(queueName + "-standard", "standard", null); Map sortedQueueAttributes = new HashMap(); - sortedQueueAttributes.put(Queue.SORT_KEY, "sortme"); + sortedQueueAttributes.put(SortedQueue.SORT_KEY, "sortme"); createQueue(queueName + "-sorted", "sorted", sortedQueueAttributes); Map priorityQueueAttributes = new HashMap(); - priorityQueueAttributes.put(Queue.PRIORITIES, 10); + priorityQueueAttributes.put(PriorityQueue.PRIORITIES, 10); createQueue(queueName + "-priority", "priority", priorityQueueAttributes); Map lvqQueueAttributes = new HashMap(); - lvqQueueAttributes.put(Queue.LVQ_KEY, "LVQ"); + lvqQueueAttributes.put(LastValueQueue.LVQ_KEY, "LVQ"); createQueue(queueName + "-lvq", "lvq", lvqQueueAttributes); Map hostDetails = getRestTestHelper().getJsonAsSingletonList("/rest/virtualhost/test"); @@ -223,9 +226,9 @@ public class VirtualHostRestTest extends QpidRestTestCase assertEquals("Unexpected value of queue attribute " + Queue.DURABLE, Boolean.TRUE, priorityQueue.get(Queue.DURABLE)); assertEquals("Unexpected value of queue attribute " + Queue.DURABLE, Boolean.TRUE, lvqQueue.get(Queue.DURABLE)); - assertEquals("Unexpected sorted key attribute", "sortme", sortedQueue.get(Queue.SORT_KEY)); - assertEquals("Unexpected lvq key attribute", "LVQ", lvqQueue.get(Queue.LVQ_KEY)); - assertEquals("Unexpected priorities key attribute", 10, priorityQueue.get(Queue.PRIORITIES)); + assertEquals("Unexpected sorted key attribute", "sortme", sortedQueue.get(SortedQueue.SORT_KEY)); + assertEquals("Unexpected lvq key attribute", "LVQ", lvqQueue.get(LastValueQueue.LVQ_KEY)); + assertEquals("Unexpected priorities key attribute", 10, priorityQueue.get(PriorityQueue.PRIORITIES)); } public void testPutCreateExchange() throws Exception @@ -271,7 +274,7 @@ public class VirtualHostRestTest extends QpidRestTestCase Asserts.assertQueue(queueName , "lvq", lvqQueue); assertEquals("Unexpected value of queue attribute " + Queue.DURABLE, Boolean.TRUE, lvqQueue.get(Queue.DURABLE)); - assertEquals("Unexpected lvq key attribute", ConflationQueue.DEFAULT_LVQ_KEY, lvqQueue.get(Queue.LVQ_KEY)); + assertEquals("Unexpected lvq key attribute", LastValueQueueImpl.DEFAULT_LVQ_KEY, lvqQueue.get(LastValueQueue.LVQ_KEY)); } public void testPutCreateSortedQueueWithoutKey() throws Exception @@ -302,7 +305,7 @@ public class VirtualHostRestTest extends QpidRestTestCase Asserts.assertQueue(queueName , "priority", priorityQueue); assertEquals("Unexpected value of queue attribute " + Queue.DURABLE, Boolean.TRUE, priorityQueue.get(Queue.DURABLE)); - assertEquals("Unexpected number of priorities", 10, priorityQueue.get(Queue.PRIORITIES)); + assertEquals("Unexpected number of priorities", 10, priorityQueue.get(PriorityQueue.PRIORITIES)); } public void testPutCreateStandardQueueWithoutType() throws Exception @@ -401,17 +404,17 @@ public class VirtualHostRestTest extends QpidRestTestCase Map sortedQueueAttributes = new HashMap(); sortedQueueAttributes.putAll(attributes); - sortedQueueAttributes.put(Queue.SORT_KEY, "sortme"); + sortedQueueAttributes.put(SortedQueue.SORT_KEY, "sortme"); createQueue(queueName + "-sorted", "sorted", sortedQueueAttributes); Map priorityQueueAttributes = new HashMap(); priorityQueueAttributes.putAll(attributes); - priorityQueueAttributes.put(Queue.PRIORITIES, 10); + priorityQueueAttributes.put(PriorityQueue.PRIORITIES, 10); createQueue(queueName + "-priority", "priority", priorityQueueAttributes); Map lvqQueueAttributes = new HashMap(); lvqQueueAttributes.putAll(attributes); - lvqQueueAttributes.put(Queue.LVQ_KEY, "LVQ"); + lvqQueueAttributes.put(LastValueQueue.LVQ_KEY, "LVQ"); createQueue(queueName + "-lvq", "lvq", lvqQueueAttributes); Map hostDetails = getRestTestHelper().getJsonAsSingletonList("/rest/virtualhost/test"); @@ -429,9 +432,9 @@ public class VirtualHostRestTest extends QpidRestTestCase Asserts.assertQueue(queueName + "-priority", "priority", priorityQueue, attributes); Asserts.assertQueue(queueName + "-lvq", "lvq", lvqQueue, attributes); - assertEquals("Unexpected sorted key attribute", "sortme", sortedQueue.get(Queue.SORT_KEY)); - assertEquals("Unexpected lvq key attribute", "LVQ", lvqQueue.get(Queue.LVQ_KEY)); - assertEquals("Unexpected priorities key attribute", 10, priorityQueue.get(Queue.PRIORITIES)); + assertEquals("Unexpected sorted key attribute", "sortme", sortedQueue.get(SortedQueue.SORT_KEY)); + assertEquals("Unexpected lvq key attribute", "LVQ", lvqQueue.get(LastValueQueue.LVQ_KEY)); + assertEquals("Unexpected priorities key attribute", 10, priorityQueue.get(PriorityQueue.PRIORITIES)); } @SuppressWarnings("unchecked") @@ -506,7 +509,7 @@ public class VirtualHostRestTest extends QpidRestTestCase queueData.put(Queue.DURABLE, Boolean.TRUE); if (queueType != null) { - queueData.put(Queue.QUEUE_TYPE, queueType); + queueData.put(Queue.TYPE, queueType); } if (attributes != null) { -- cgit v1.2.1