diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2014-08-08 16:23:43 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2014-08-08 16:23:43 +0000 |
| commit | ddb91338d5920311f9f7a4493df26c8c9e108e3d (patch) | |
| tree | 838f63479a6e536dee10530e3b67902213a98678 /qpid/java | |
| parent | 30d0a85f3a19f28eca299e56e7c959a9a810acc8 (diff) | |
| download | qpid-python-ddb91338d5920311f9f7a4493df26c8c9e108e3d.tar.gz | |
QPID-4307 : [Java Broker] prevent the copying/moving of messages onto queues on which the message already exists
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1616813 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
24 files changed, 471 insertions, 24 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java index 58ffd88b85..e41bb948dc 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java @@ -511,7 +511,7 @@ public abstract class AbstractExchange<T extends AbstractExchange<T>> Exchange altExchange = getAlternateExchange(); if(altExchange != null) { - return ((ExchangeImpl)altExchange).send(message, routingAddress, instanceProperties, txn, postEnqueueAction); + return altExchange.send(message, routingAddress, instanceProperties, txn, postEnqueueAction); } else { @@ -520,7 +520,24 @@ public abstract class AbstractExchange<T extends AbstractExchange<T>> } else { - final BaseQueue[] baseQueues = queues.toArray(new BaseQueue[queues.size()]); + final BaseQueue[] baseQueues; + + if(message.isReferenced()) + { + ArrayList<BaseQueue> uniqueQueues = new ArrayList<>(queues.size()); + for(BaseQueue q : queues) + { + if(!message.isReferenced(q)) + { + uniqueQueues.add(q); + } + } + baseQueues = uniqueQueues.toArray(new BaseQueue[uniqueQueues.size()]); + } + else + { + baseQueues = queues.toArray(new BaseQueue[queues.size()]); + } txn.enqueue(queues,message, new ServerTransaction.Action() { diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java index d397dd57b6..d2789bfe58 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java @@ -21,10 +21,17 @@ package org.apache.qpid.server.message; import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.UUID; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import org.apache.qpid.server.store.StorableMessageMetaData; import org.apache.qpid.server.store.StoredMessage; +import org.apache.qpid.server.store.TransactionLogResource; import org.apache.qpid.server.util.ServerScopedRuntimeException; public abstract class AbstractServerMessageImpl<X extends AbstractServerMessageImpl<X,T>, T extends StorableMessageMetaData> implements ServerMessage<T> @@ -33,10 +40,14 @@ public abstract class AbstractServerMessageImpl<X extends AbstractServerMessageI private static final AtomicIntegerFieldUpdater<AbstractServerMessageImpl> _refCountUpdater = AtomicIntegerFieldUpdater.newUpdater(AbstractServerMessageImpl.class, "_referenceCount"); + private static final AtomicReferenceFieldUpdater<AbstractServerMessageImpl, Collection> _resourcesUpdater = + AtomicReferenceFieldUpdater.newUpdater(AbstractServerMessageImpl.class, Collection.class,"_resources"); + private volatile int _referenceCount = 0; private final StoredMessage<T> _handle; private final Object _connectionReference; + private volatile Collection<UUID> _resources; public AbstractServerMessageImpl(StoredMessage<T> handle, Object connectionReference) @@ -117,6 +128,26 @@ public abstract class AbstractServerMessageImpl<X extends AbstractServerMessageI } @Override + final public MessageReference<X> newReference(TransactionLogResource object) + { + return new Reference(this, object); + } + + @Override + final public boolean isReferenced(TransactionLogResource resource) + { + Collection<UUID> resources = _resources; + return resources != null && resources.contains(resource.getId()); + } + + @Override + final public boolean isReferenced() + { + Collection<UUID> resources = _resources; + return resources != null && !resources.isEmpty(); + } + + @Override final public boolean isPersistent() { return _handle.getMetaData().isPersistent(); @@ -156,15 +187,52 @@ public abstract class AbstractServerMessageImpl<X extends AbstractServerMessageI AtomicIntegerFieldUpdater.newUpdater(Reference.class, "_released"); private AbstractServerMessageImpl<X, T> _message; + private final UUID _resourceId; private volatile int _released; private Reference(final AbstractServerMessageImpl<X, T> message) { + this(message, null); + } + private Reference(final AbstractServerMessageImpl<X, T> message, TransactionLogResource resource) + { _message = message; + if(resource != null) + { + Collection<UUID> currentValue; + Collection<UUID> newValue; + _resourceId = resource.getId(); + do + { + currentValue = _message._resources; + + if(currentValue == null) + { + newValue = Collections.singleton(_resourceId); + } + else + { + if(currentValue.contains(_resourceId)) + { + throw new MessageAlreadyReferencedException(_message.getMessageNumber(), resource); + } + newValue = new ArrayList<>(currentValue.size()+1); + newValue.addAll(currentValue); + newValue.add(_resourceId); + } + + } + while(!_resourcesUpdater.compareAndSet(_message, currentValue, newValue)); + } + else + { + _resourceId = null; + } if(!_message.incrementReference()) { throw new MessageDeletedException(message.getMessageNumber()); } + } public X getMessage() @@ -176,6 +244,34 @@ public abstract class AbstractServerMessageImpl<X extends AbstractServerMessageI { if(_releasedUpdater.compareAndSet(this,0,1)) { + if(_resourceId != null) + { + Collection<UUID> currentValue; + Collection<UUID> newValue; + do + { + currentValue = _message._resources; + if(currentValue.size() == 1) + { + newValue = null; + } + else + { + UUID[] array = new UUID[currentValue.size()-1]; + int pos = 0; + for(UUID uuid : currentValue) + { + if(!_resourceId.equals(uuid)) + { + array[pos++] = uuid; + } + } + newValue = Arrays.asList(array); + } + } + while(!_resourcesUpdater.compareAndSet(_message, currentValue, newValue)); + + } _message.decrementReference(); } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageAlreadyReferencedException.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageAlreadyReferencedException.java new file mode 100644 index 0000000000..7ab2625e63 --- /dev/null +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageAlreadyReferencedException.java @@ -0,0 +1,31 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.message; + +import org.apache.qpid.server.store.TransactionLogResource; + +public class MessageAlreadyReferencedException extends RuntimeException +{ + MessageAlreadyReferencedException(final long messageNumber, TransactionLogResource resource) + { + super("The message with id " + messageNumber + " is already referenced by resource " + resource.getName()); + } +} diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java index 8c35af8be4..81e6b13ffd 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java @@ -20,10 +20,11 @@ */ package org.apache.qpid.server.message; +import java.nio.ByteBuffer; + import org.apache.qpid.server.store.StorableMessageMetaData; import org.apache.qpid.server.store.StoredMessage; - -import java.nio.ByteBuffer; +import org.apache.qpid.server.store.TransactionLogResource; public interface ServerMessage<T extends StorableMessageMetaData> extends EnqueueableMessage, MessageContentSource { @@ -41,6 +42,12 @@ public interface ServerMessage<T extends StorableMessageMetaData> extends Enqueu MessageReference newReference(); + MessageReference newReference(TransactionLogResource object); + + boolean isReferenced(TransactionLogResource resource); + + boolean isReferenced(); + long getMessageNumber(); long getArrivalTime(); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java index 8483e35b9e..545a1d941d 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java @@ -2549,7 +2549,9 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> final ServerTransaction txn, final Action<? super MessageInstance> postEnqueueAction) { - txn.enqueue(this,message, new ServerTransaction.Action() + if(!message.isReferenced(this)) + { + txn.enqueue(this, message, new ServerTransaction.Action() { MessageReference _reference = message.newReference(); @@ -2571,6 +2573,11 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> } }); return 1; + } + else + { + return 0; + } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java index 96916a02e2..6c541d78ef 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java @@ -103,7 +103,7 @@ public abstract class QueueEntryImpl implements QueueEntry { _queueEntryList = queueEntryList; - _message = message == null ? null : message.newReference(); + _message = message == null ? null : message.newReference(queueEntryList.getQueue()); _entryIdUpdater.set(this, entryId); populateInstanceProperties(); @@ -112,7 +112,7 @@ public abstract class QueueEntryImpl implements QueueEntry public QueueEntryImpl(QueueEntryList queueEntryList, ServerMessage message) { _queueEntryList = queueEntryList; - _message = message == null ? null : message.newReference(); + _message = message == null ? null : message.newReference(queueEntryList.getQueue()); populateInstanceProperties(); } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java index 28dfc73a27..d4a91f2c0b 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java @@ -24,7 +24,7 @@ import org.apache.qpid.server.message.ServerMessage; public interface QueueEntryList { - AMQQueue getQueue(); + AMQQueue<?> getQueue(); QueueEntry add(ServerMessage message); diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java index d7779390b1..c775a70cb8 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java @@ -21,6 +21,7 @@ package org.apache.qpid.server.exchange; import static org.apache.qpid.common.AMQPFilterTypes.*; +import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -43,6 +44,7 @@ import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.BaseQueue; +import org.apache.qpid.server.store.TransactionLogResource; import org.apache.qpid.server.util.BrokerTestHelper; import org.apache.qpid.server.virtualhost.QueueExistsException; import org.apache.qpid.server.virtualhost.VirtualHostImpl; @@ -497,6 +499,7 @@ public class TopicExchangeTest extends QpidTestCase MessageReference ref = mock(MessageReference.class); when(ref.getMessage()).thenReturn(message); when(message.newReference()).thenReturn(ref); + when(message.newReference(any(TransactionLogResource.class))).thenReturn(ref); when(message.getMessageNumber()).thenReturn(messageNumber); for(BaseQueue q : queues) { diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/message/AbstractServerMessageTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/message/AbstractServerMessageTest.java new file mode 100644 index 0000000000..c90e406ba9 --- /dev/null +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/message/AbstractServerMessageTest.java @@ -0,0 +1,146 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.message; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.UUID; + +import org.apache.qpid.server.store.StorableMessageMetaData; +import org.apache.qpid.server.store.StoredMessage; +import org.apache.qpid.server.store.TransactionLogResource; +import org.apache.qpid.test.utils.QpidTestCase; + +public class AbstractServerMessageTest extends QpidTestCase +{ + private static class TestMessage<T extends StorableMessageMetaData> extends AbstractServerMessageImpl<TestMessage<T>,T> + { + + public TestMessage(final StoredMessage<T> handle, + final Object connectionReference) + { + super(handle, connectionReference); + } + + @Override + public String getInitialRoutingAddress() + { + return null; + } + + @Override + public AMQMessageHeader getMessageHeader() + { + return null; + } + + @Override + public long getSize() + { + return 0; + } + + @Override + public long getExpiration() + { + return 0; + } + + @Override + public long getArrivalTime() + { + return 0; + } + } + + private TransactionLogResource createQueue(String name) + { + TransactionLogResource queue = mock(TransactionLogResource.class); + when(queue.getId()).thenReturn(UUID.randomUUID()); + when(queue.getName()).thenReturn(name); + return queue; + } + + public void testReferences() + { + TransactionLogResource q1 = createQueue("1"); + TransactionLogResource q2 = createQueue("2"); + + TestMessage<StorableMessageMetaData> msg = new TestMessage<StorableMessageMetaData>(mock(StoredMessage.class),this); + assertFalse(msg.isReferenced()); + assertFalse(msg.isReferenced(q1)); + + MessageReference<TestMessage<StorableMessageMetaData>> nonQueueRef = msg.newReference(); + assertFalse(msg.isReferenced()); + assertFalse(msg.isReferenced(q1)); + + MessageReference<TestMessage<StorableMessageMetaData>> q1ref = msg.newReference(q1); + assertTrue(msg.isReferenced()); + assertTrue(msg.isReferenced(q1)); + assertFalse(msg.isReferenced(q2)); + + q1ref.release(); + assertFalse(msg.isReferenced()); + assertFalse(msg.isReferenced(q1)); + + q1ref = msg.newReference(q1); + assertTrue(msg.isReferenced()); + assertTrue(msg.isReferenced(q1)); + assertFalse(msg.isReferenced(q2)); + + MessageReference<TestMessage<StorableMessageMetaData>> q2ref = msg.newReference(q2); + assertTrue(msg.isReferenced()); + assertTrue(msg.isReferenced(q1)); + assertTrue(msg.isReferenced(q2)); + + try + { + msg.newReference(q1); + fail("Should not be able to create a second reference to the same queue"); + } + catch (MessageAlreadyReferencedException e) + { + // pass + } + q2ref.release(); + assertTrue(msg.isReferenced()); + assertTrue(msg.isReferenced(q1)); + assertFalse(msg.isReferenced(q2)); + + q1ref.release(); + assertFalse(msg.isReferenced()); + assertFalse(msg.isReferenced(q1)); + + nonQueueRef.release(); + + try + { + msg.newReference(q1); + fail("Message should not allow new references as all references had been removed"); + } + catch(MessageDeletedException e) + { + // pass + } + + } +} diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java index 0def708fed..9255dbf42e 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java @@ -21,6 +21,7 @@ package org.apache.qpid.server.queue; +import static org.mockito.Matchers.any; import static org.mockito.Matchers.contains; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.atLeastOnce; @@ -60,6 +61,7 @@ import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.model.QueueNotificationListener; import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.queue.AbstractQueue.QueueEntryFilter; +import org.apache.qpid.server.store.TransactionLogResource; import org.apache.qpid.server.util.Action; import org.apache.qpid.server.util.BrokerTestHelper; import org.apache.qpid.server.virtualhost.VirtualHostImpl; @@ -1157,6 +1159,7 @@ abstract class AbstractQueueTestBase extends QpidTestCase when(message.newReference()).thenReturn(ref); + when(message.newReference(any(TransactionLogResource.class))).thenReturn(ref); return message; } diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/LastValueQueueListTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/LastValueQueueListTest.java index 70a35dc4aa..799fc71d74 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/LastValueQueueListTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/LastValueQueueListTest.java @@ -19,6 +19,7 @@ */ package org.apache.qpid.server.queue; +import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -38,6 +39,7 @@ import org.apache.qpid.server.model.ConfiguredObjectFactory; import org.apache.qpid.server.model.ConfiguredObjectFactoryImpl; import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.security.SecurityManager; +import org.apache.qpid.server.store.TransactionLogResource; import org.apache.qpid.server.virtualhost.VirtualHostImpl; public class LastValueQueueListTest extends TestCase @@ -220,6 +222,8 @@ public class LastValueQueueListTest extends TestCase MessageReference messageReference = mock(MessageReference.class); when(mockMessage.newReference()).thenReturn(messageReference); + when(mockMessage.newReference(any(TransactionLogResource.class))).thenReturn(messageReference); + when(messageReference.getMessage()).thenReturn(mockMessage); return mockMessage; diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueListTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueListTest.java index cc5f36098e..631731ecc0 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueListTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueListTest.java @@ -20,6 +20,7 @@ */ package org.apache.qpid.server.queue; +import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -37,6 +38,7 @@ import org.apache.qpid.server.model.ConfiguredObjectFactory; import org.apache.qpid.server.model.ConfiguredObjectFactoryImpl; import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.security.SecurityManager; +import org.apache.qpid.server.store.TransactionLogResource; import org.apache.qpid.server.util.BrokerTestHelper; import org.apache.qpid.server.virtualhost.VirtualHostImpl; import org.apache.qpid.test.utils.QpidTestCase; @@ -79,6 +81,7 @@ public class PriorityQueueListTest extends QpidTestCase when(message.getMessageHeader()).thenReturn(header); when(message.newReference()).thenReturn(ref); + when(message.newReference(any(TransactionLogResource.class))).thenReturn(ref); when(ref.getMessage()).thenReturn(message); when(header.getPriority()).thenReturn(PRIORITIES[i]); diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java index 3a9f990846..40b6c1bebd 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java @@ -18,6 +18,7 @@ */ package org.apache.qpid.server.queue; +import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -38,6 +39,7 @@ import org.apache.qpid.server.model.BrokerModel; import org.apache.qpid.server.model.ConfiguredObjectFactory; import org.apache.qpid.server.model.ConfiguredObjectFactoryImpl; import org.apache.qpid.server.model.Queue; +import org.apache.qpid.server.store.TransactionLogResource; import org.apache.qpid.server.virtualhost.VirtualHostImpl; /** @@ -148,9 +150,11 @@ public abstract class QueueEntryImplTestBase extends TestCase _queueEntry.isAcquired()); assertFalse("Acquisition should initially be locked",_queueEntry.removeAcquisitionFromConsumer(consumer)); - assertTrue("Should be able to unlock locked queue entry",_queueEntry.unlockAcquisition()); - assertFalse("Acquisition should not be able to be removed from the wrong consumer",_queueEntry.removeAcquisitionFromConsumer(consumer2)); - assertTrue("Acquisition should be able to be removed once unlocked",_queueEntry.removeAcquisitionFromConsumer(consumer)); + assertTrue("Should be able to unlock locked queue entry", _queueEntry.unlockAcquisition()); + assertFalse("Acquisition should not be able to be removed from the wrong consumer", + _queueEntry.removeAcquisitionFromConsumer(consumer2)); + assertTrue("Acquisition should be able to be removed once unlocked", + _queueEntry.removeAcquisitionFromConsumer(consumer)); assertTrue("Queue Entry should still be acquired", _queueEntry.isAcquired()); assertFalse("Queue Entry should not be marked as acquired by a consumer", _queueEntry.acquiredByConsumer()); @@ -254,6 +258,7 @@ public abstract class QueueEntryImplTestBase extends TestCase final MessageReference reference = mock(MessageReference.class); when(reference.getMessage()).thenReturn(message); when(message.newReference()).thenReturn(reference); + when(message.newReference(any(TransactionLogResource.class))).thenReturn(reference); QueueEntryImpl entry = (QueueEntryImpl) queueEntryList.add(message); entries[i] = entry; } diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryListTestBase.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryListTestBase.java index c89d2abeae..a0ab7cd454 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryListTestBase.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryListTestBase.java @@ -19,14 +19,16 @@ */ package org.apache.qpid.server.queue; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + import junit.framework.TestCase; import org.apache.qpid.server.message.AMQMessageHeader; import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.message.ServerMessage; - -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; +import org.apache.qpid.server.store.TransactionLogResource; /** * Abstract test class for QueueEntryList implementations. @@ -96,6 +98,7 @@ public abstract class QueueEntryListTestBase extends TestCase AMQMessageHeader hdr = mock(AMQMessageHeader.class); when(ref.getMessage()).thenReturn(message); when(message.newReference()).thenReturn(ref); + when(message.newReference(any(TransactionLogResource.class))).thenReturn(ref); when(message.getMessageHeader()).thenReturn(hdr); return message; } diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueMessageRecoveryTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueMessageRecoveryTest.java index bd23aaa50a..79d7628a9c 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueMessageRecoveryTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueMessageRecoveryTest.java @@ -20,6 +20,7 @@ */ package org.apache.qpid.server.queue; +import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -33,6 +34,7 @@ import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.store.StoredMessage; +import org.apache.qpid.server.store.TransactionLogResource; import org.apache.qpid.server.util.Action; import org.apache.qpid.server.util.BrokerTestHelper; import org.apache.qpid.server.virtualhost.VirtualHostImpl; @@ -162,6 +164,7 @@ public class QueueMessageRecoveryTest extends QpidTestCase MessageReference ref = mock(MessageReference.class); when(ref.getMessage()).thenReturn(msg); when(msg.newReference()).thenReturn(ref); + when(msg.newReference(any(TransactionLogResource.class))).thenReturn(ref); when(msg.getStoredMessage()).thenReturn(mock(StoredMessage.class)); return msg; } diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java index eaed1427b2..a2d314d629 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java @@ -20,6 +20,7 @@ */ package org.apache.qpid.server.queue; +import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -35,6 +36,7 @@ import org.apache.qpid.server.model.BrokerModel; import org.apache.qpid.server.model.ConfiguredObjectFactory; import org.apache.qpid.server.model.ConfiguredObjectFactoryImpl; import org.apache.qpid.server.model.Queue; +import org.apache.qpid.server.store.TransactionLogResource; import org.apache.qpid.server.virtualhost.VirtualHostImpl; public class SimpleQueueEntryImplTest extends QueueEntryImplTestBase @@ -70,6 +72,7 @@ public class SimpleQueueEntryImplTest extends QueueEntryImplTestBase final MessageReference reference = mock(MessageReference.class); when(reference.getMessage()).thenReturn(message); when(message.newReference()).thenReturn(reference); + when(message.newReference(any(TransactionLogResource.class))).thenReturn(reference); return (QueueEntryImpl) queueEntryList.add(message); } diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java index bcc1e7bc0e..0c7f19bbd5 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java @@ -19,6 +19,7 @@ */ package org.apache.qpid.server.queue; +import static org.mockito.Matchers.any; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -40,6 +41,7 @@ import org.apache.qpid.server.model.ConfiguredObjectFactoryImpl; import org.apache.qpid.server.model.LifetimePolicy; import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.security.SecurityManager; +import org.apache.qpid.server.store.TransactionLogResource; import org.apache.qpid.server.virtualhost.VirtualHostImpl; public class SortedQueueEntryListTest extends QueueEntryListTestBase @@ -180,6 +182,7 @@ public class SortedQueueEntryListTest extends QueueEntryListTestBase MessageReference ref = mock(MessageReference.class); when(ref.getMessage()).thenReturn(message); when(message.newReference()).thenReturn(ref); + when(message.newReference(any(TransactionLogResource.class))).thenReturn(ref); when(message.getMessageNumber()).thenReturn(id); return message; diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryTest.java index 268d334949..d9a176c688 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryTest.java @@ -19,6 +19,7 @@ */ package org.apache.qpid.server.queue; +import static org.mockito.Matchers.any; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -38,6 +39,7 @@ import org.apache.qpid.server.model.ConfiguredObjectFactory; import org.apache.qpid.server.model.ConfiguredObjectFactoryImpl; import org.apache.qpid.server.model.LifetimePolicy; import org.apache.qpid.server.model.Queue; +import org.apache.qpid.server.store.TransactionLogResource; import org.apache.qpid.server.virtualhost.VirtualHostImpl; public class SortedQueueEntryTest extends QueueEntryImplTestBase @@ -97,6 +99,7 @@ public class SortedQueueEntryTest extends QueueEntryImplTestBase final MessageReference reference = mock(MessageReference.class); when(reference.getMessage()).thenReturn(message); when(message.newReference()).thenReturn(reference); + when(message.newReference(any(TransactionLogResource.class))).thenReturn(reference); return _queueEntryList.add(message); } diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java index 89bb32e133..95c53c8428 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java @@ -20,6 +20,7 @@ */ package org.apache.qpid.server.queue; +import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -36,6 +37,7 @@ import org.apache.qpid.server.model.BrokerModel; import org.apache.qpid.server.model.ConfiguredObjectFactoryImpl; import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.security.SecurityManager; +import org.apache.qpid.server.store.TransactionLogResource; import org.apache.qpid.server.virtualhost.VirtualHostImpl; public class StandardQueueEntryListTest extends QueueEntryListTestBase @@ -73,6 +75,7 @@ public class StandardQueueEntryListTest extends QueueEntryListTestBase MessageReference ref = mock(MessageReference.class); when(ref.getMessage()).thenReturn(message); when(message.newReference()).thenReturn(ref); + when(message.newReference(any(TransactionLogResource.class))).thenReturn(ref); final QueueEntry bleh = _sqel.add(message); assertNotNull("QE should not have been null", bleh); @@ -163,6 +166,7 @@ public class StandardQueueEntryListTest extends QueueEntryListTestBase MessageReference ref = mock(MessageReference.class); when(ref.getMessage()).thenReturn(message); when(message.newReference()).thenReturn(ref); + when(message.newReference(any(TransactionLogResource.class))).thenReturn(ref); QueueEntry bleh = sqel.add(message); assertNotNull("QE should not have been null", bleh); entriesMap.put(i,bleh); @@ -264,6 +268,7 @@ public class StandardQueueEntryListTest extends QueueEntryListTestBase final MessageReference reference = mock(MessageReference.class); when(reference.getMessage()).thenReturn(message); when(message.newReference()).thenReturn(reference); + when(message.newReference(any(TransactionLogResource.class))).thenReturn(reference); entries[i] = (OrderedQueueEntry) queueEntryList.add(message); } diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java index e16ba66391..848675bf5d 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java @@ -165,6 +165,24 @@ public class TestMessageMetaDataType implements MessageMetaDataType<TestMessageM } @Override + public MessageReference newReference(final TransactionLogResource object) + { + return _messageReference; + } + + @Override + public boolean isReferenced(final TransactionLogResource resource) + { + return false; + } + + @Override + public boolean isReferenced() + { + return false; + } + + @Override public int hashCode() { final int prime = 31; diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java index 8992cf62c9..e0fbb6dcc3 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java @@ -20,14 +20,15 @@ */ package org.apache.qpid.server.txn; +import java.nio.ByteBuffer; + import org.apache.commons.lang.NotImplementedException; import org.apache.qpid.server.message.AMQMessageHeader; import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.store.StoredMessage; - -import java.nio.ByteBuffer; +import org.apache.qpid.server.store.TransactionLogResource; /** * Mock Server Message allowing its persistent flag to be controlled from test. @@ -57,6 +58,24 @@ class MockServerMessage implements ServerMessage throw new NotImplementedException(); } + @Override + public MessageReference newReference(final TransactionLogResource object) + { + throw new NotImplementedException(); + } + + @Override + public boolean isReferenced(final TransactionLogResource resource) + { + return false; + } + + @Override + public boolean isReferenced() + { + return false; + } + public boolean isImmediate() { throw new NotImplementedException(); @@ -113,4 +132,4 @@ class MockServerMessage implements ServerMessage { return 0L; } -}
\ No newline at end of file +} diff --git a/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/MessageServlet.java b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/MessageServlet.java index 9866207234..8c77876e1a 100644 --- a/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/MessageServlet.java +++ b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/MessageServlet.java @@ -33,6 +33,7 @@ import javax.servlet.http.HttpServletResponse; import org.apache.log4j.Logger; import org.codehaus.jackson.map.ObjectMapper; import org.codehaus.jackson.map.SerializationConfig; + import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.message.AMQMessageHeader; import org.apache.qpid.server.message.MessageDeletedException; @@ -44,6 +45,7 @@ import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.queue.QueueEntryVisitor; import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.security.access.Operation; +import org.apache.qpid.server.store.TransactionLogResource; public class MessageServlet extends AbstractServlet { @@ -212,7 +214,11 @@ public class MessageServlet extends AbstractServlet @Override protected void updateEntry(QueueEntry entry, VirtualHost.Transaction txn) { - txn.move(entry, _destinationQueue); + ServerMessage msg = entry.getMessage(); + if(msg != null && !msg.isReferenced((TransactionLogResource)_destinationQueue)) + { + txn.move(entry, _destinationQueue); + } } } @@ -229,7 +235,11 @@ public class MessageServlet extends AbstractServlet @Override protected void updateEntry(QueueEntry entry, VirtualHost.Transaction txn) { - txn.copy(entry, _destinationQueue); + ServerMessage msg = entry.getMessage(); + if(msg != null && !msg.isReferenced((TransactionLogResource)_destinationQueue)) + { + txn.copy(entry, _destinationQueue); + } } } diff --git a/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/QueueMBean.java b/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/QueueMBean.java index ca092fe6f8..5f5d6e7efe 100644 --- a/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/QueueMBean.java +++ b/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/QueueMBean.java @@ -59,6 +59,7 @@ import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.queue.NotificationCheck; import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.queue.QueueEntryVisitor; +import org.apache.qpid.server.store.TransactionLogResource; import org.apache.qpid.server.util.ServerScopedRuntimeException; public class QueueMBean extends AMQManagedObject implements ManagedQueue, QueueNotificationListener @@ -519,7 +520,8 @@ public class QueueMBean extends AMQManagedObject implements ManagedQueue, QueueN final long messageId = message.getMessageNumber(); if ((messageId >= fromMessageId) - && (messageId <= toMessageId)) + && (messageId <= toMessageId) + && !(message.isReferenced((TransactionLogResource)destinationQueue))) { txn.move(entry, destinationQueue); } @@ -571,8 +573,8 @@ public class QueueMBean extends AMQManagedObject implements ManagedQueue, QueueN } VirtualHost<?,?,?> vhost = _queue.getParent(VirtualHost.class); - final Queue<?> queue = vhost.getChildByName(Queue.class, toQueue); - if (queue == null) + final Queue<?> destinationQueue = vhost.getChildByName(Queue.class, toQueue); + if (destinationQueue == null) { throw new OperationsException("No such queue \""+ toQueue +"\""); } @@ -591,9 +593,10 @@ public class QueueMBean extends AMQManagedObject implements ManagedQueue, QueueN final long messageId = message.getMessageNumber(); if ((messageId >= fromMessageId) - && (messageId <= toMessageId)) + && (messageId <= toMessageId) + && !(message.isReferenced((TransactionLogResource)destinationQueue))) { - txn.copy(entry, queue); + txn.copy(entry, destinationQueue); } } diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/systest/management/jmx/QueueManagementTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/systest/management/jmx/QueueManagementTest.java index a6a08d83f9..d0f133aa73 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/systest/management/jmx/QueueManagementTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/systest/management/jmx/QueueManagementTest.java @@ -456,6 +456,61 @@ public class QueueManagementTest extends QpidBrokerTestCase assertMessageIndicesOn(_destinationQueue, 0, 1, 2, 7, 8); } + + /** + * Tests {@link ManagedQueue#copyMessages(long, long, String)} interface. + */ + public void testCopyMessagesBetweenQueuesWithDuplicates() throws Exception + { + final int numberOfMessagesToSend = 10; + sendMessage(_session, _sourceQueue, numberOfMessagesToSend); + syncSession(_session); + assertEquals("Unexpected queue depth after send", + numberOfMessagesToSend, + _managedSourceQueue.getMessageCount().intValue()); + + List<Long> amqMessagesIds = getAMQMessageIdsOn(_managedSourceQueue, 1, numberOfMessagesToSend); + + // Copy first three messages to destination + long fromMessageId = amqMessagesIds.get(0); + long toMessageId = amqMessagesIds.get(2); + _managedSourceQueue.copyMessages(fromMessageId, toMessageId, _destinationQueueName); + + assertEquals("Unexpected queue depth on destination queue after first copy", + 3, + _managedDestinationQueue.getMessageCount().intValue()); + assertEquals("Unexpected queue depth on source queue after first copy", + numberOfMessagesToSend, + _managedSourceQueue.getMessageCount().intValue()); + + // Now copy a further two messages to destination + fromMessageId = amqMessagesIds.get(7); + toMessageId = amqMessagesIds.get(8); + _managedSourceQueue.copyMessages(fromMessageId, toMessageId, _destinationQueueName); + assertEquals("Unexpected queue depth on destination queue after second copy", + 5, + _managedDestinationQueue.getMessageCount().intValue()); + assertEquals("Unexpected queue depth on source queue after second copy", + numberOfMessagesToSend, + _managedSourceQueue.getMessageCount().intValue()); + + // Attempt to copy mixture of messages already on and some not already on the queue + + fromMessageId = amqMessagesIds.get(5); + toMessageId = amqMessagesIds.get(8); + _managedSourceQueue.copyMessages(fromMessageId, toMessageId, _destinationQueueName); + assertEquals("Unexpected queue depth on destination queue after second copy", + 7, + _managedDestinationQueue.getMessageCount().intValue()); + assertEquals("Unexpected queue depth on source queue after second copy", + numberOfMessagesToSend, + _managedSourceQueue.getMessageCount().intValue()); + + assertMessageIndicesOn(_destinationQueue, 0, 1, 2, 7, 8, 5, 6); + + + } + public void testMoveMessagesBetweenQueuesWithActiveConsumerOnSourceQueue() throws Exception { setTestClientSystemProperty(ClientProperties.MAX_PREFETCH_PROP_NAME, new Integer(1).toString()); |
