From ddb91338d5920311f9f7a4493df26c8c9e108e3d Mon Sep 17 00:00:00 2001 From: Robert Godfrey Date: Fri, 8 Aug 2014 16:23:43 +0000 Subject: QPID-4307 : [Java Broker] prevent the copying/moving of messages onto queues on which the message already exists git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1616813 13f79535-47bb-0310-9956-ffa450edef68 --- .../management/plugin/servlet/rest/MessageServlet.java | 14 ++++++++++++-- .../java/org/apache/qpid/server/jmx/mbeans/QueueMBean.java | 13 ++++++++----- 2 files changed, 20 insertions(+), 7 deletions(-) (limited to 'qpid/java/broker-plugins') diff --git a/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/MessageServlet.java b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/MessageServlet.java index 9866207234..8c77876e1a 100644 --- a/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/MessageServlet.java +++ b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/MessageServlet.java @@ -33,6 +33,7 @@ import javax.servlet.http.HttpServletResponse; import org.apache.log4j.Logger; import org.codehaus.jackson.map.ObjectMapper; import org.codehaus.jackson.map.SerializationConfig; + import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.message.AMQMessageHeader; import org.apache.qpid.server.message.MessageDeletedException; @@ -44,6 +45,7 @@ import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.queue.QueueEntryVisitor; import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.security.access.Operation; +import org.apache.qpid.server.store.TransactionLogResource; public class MessageServlet extends AbstractServlet { @@ -212,7 +214,11 @@ public class MessageServlet extends AbstractServlet @Override protected void updateEntry(QueueEntry entry, VirtualHost.Transaction txn) { - txn.move(entry, _destinationQueue); + ServerMessage msg = entry.getMessage(); + if(msg != null && !msg.isReferenced((TransactionLogResource)_destinationQueue)) + { + txn.move(entry, _destinationQueue); + } } } @@ -229,7 +235,11 @@ public class MessageServlet extends AbstractServlet @Override protected void updateEntry(QueueEntry entry, VirtualHost.Transaction txn) { - txn.copy(entry, _destinationQueue); + ServerMessage msg = entry.getMessage(); + if(msg != null && !msg.isReferenced((TransactionLogResource)_destinationQueue)) + { + txn.copy(entry, _destinationQueue); + } } } diff --git a/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/QueueMBean.java b/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/QueueMBean.java index ca092fe6f8..5f5d6e7efe 100644 --- a/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/QueueMBean.java +++ b/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/QueueMBean.java @@ -59,6 +59,7 @@ import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.queue.NotificationCheck; import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.queue.QueueEntryVisitor; +import org.apache.qpid.server.store.TransactionLogResource; import org.apache.qpid.server.util.ServerScopedRuntimeException; public class QueueMBean extends AMQManagedObject implements ManagedQueue, QueueNotificationListener @@ -519,7 +520,8 @@ public class QueueMBean extends AMQManagedObject implements ManagedQueue, QueueN final long messageId = message.getMessageNumber(); if ((messageId >= fromMessageId) - && (messageId <= toMessageId)) + && (messageId <= toMessageId) + && !(message.isReferenced((TransactionLogResource)destinationQueue))) { txn.move(entry, destinationQueue); } @@ -571,8 +573,8 @@ public class QueueMBean extends AMQManagedObject implements ManagedQueue, QueueN } VirtualHost vhost = _queue.getParent(VirtualHost.class); - final Queue queue = vhost.getChildByName(Queue.class, toQueue); - if (queue == null) + final Queue destinationQueue = vhost.getChildByName(Queue.class, toQueue); + if (destinationQueue == null) { throw new OperationsException("No such queue \""+ toQueue +"\""); } @@ -591,9 +593,10 @@ public class QueueMBean extends AMQManagedObject implements ManagedQueue, QueueN final long messageId = message.getMessageNumber(); if ((messageId >= fromMessageId) - && (messageId <= toMessageId)) + && (messageId <= toMessageId) + && !(message.isReferenced((TransactionLogResource)destinationQueue))) { - txn.copy(entry, queue); + txn.copy(entry, destinationQueue); } } -- cgit v1.2.1