diff options
Diffstat (limited to 'qpid/java')
11 files changed, 59 insertions, 73 deletions
diff --git a/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/DiagnosticExchange.java b/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/DiagnosticExchange.java index 0b7e300cec..027d220538 100644 --- a/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/DiagnosticExchange.java +++ b/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/DiagnosticExchange.java @@ -204,7 +204,7 @@ public class DiagnosticExchange extends AbstractExchange ((BasicContentHeaderProperties)payload.getContentHeaderBody().properties).setHeaders(headers); AMQQueue q = getQueueRegistry().getQueue(new AMQShortString("diagnosticqueue")); - Collection<AMQQueue> queues = new ArrayList<AMQQueue>(); + ArrayList<AMQQueue> queues = new ArrayList<AMQQueue>(); queues.add(q); payload.enqueue(queues); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java index 4da639567a..616f47bd24 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java @@ -191,7 +191,7 @@ public class DirectExchange extends AbstractExchange final AMQShortString routingKey = payload.getRoutingKey() == null ? AMQShortString.EMPTY_STRING : payload.getRoutingKey(); - final List<AMQQueue> queues = (routingKey == null) ? null : _index.get(routingKey); + final ArrayList<AMQQueue> queues = (routingKey == null) ? null : _index.get(routingKey); if (_logger.isDebugEnabled()) { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java index d1bea3410b..1ee1f35de6 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java @@ -249,7 +249,7 @@ public class HeadersExchange extends AbstractExchange _logger.debug("Exchange " + getName() + ": routing message with headers " + headers); } boolean routed = false; - Collection<AMQQueue> queues = new ArrayList<AMQQueue>(); + ArrayList<AMQQueue> queues = new ArrayList<AMQQueue>(); for (Registration e : _bindings) { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Index.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Index.java index 9bf82a3730..ec83161029 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Index.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Index.java @@ -37,12 +37,12 @@ import org.apache.qpid.server.queue.AMQQueue; */ class Index { - private ConcurrentMap<AMQShortString, List<AMQQueue>> _index - = new ConcurrentHashMap<AMQShortString, List<AMQQueue>>(); + private ConcurrentMap<AMQShortString, ArrayList<AMQQueue>> _index + = new ConcurrentHashMap<AMQShortString, ArrayList<AMQQueue>>(); synchronized boolean add(AMQShortString key, AMQQueue queue) { - List<AMQQueue> queues = _index.get(key); + ArrayList<AMQQueue> queues = _index.get(key); if(queues == null) { queues = new ArrayList<AMQQueue>(); @@ -66,7 +66,7 @@ class Index synchronized boolean remove(AMQShortString key, AMQQueue queue) { - List<AMQQueue> queues = _index.get(key); + ArrayList<AMQQueue> queues = _index.get(key); if (queues != null) { queues = new ArrayList<AMQQueue>(queues); @@ -87,7 +87,7 @@ class Index return false; } - List<AMQQueue> get(AMQShortString key) + ArrayList<AMQQueue> get(AMQShortString key) { return _index.get(key); } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java index d07501a188..c18cc337fe 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java @@ -32,7 +32,6 @@ import org.apache.qpid.server.management.MBeanConstructor; import org.apache.qpid.server.management.MBeanDescription; import org.apache.qpid.server.queue.IncomingMessage; import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.queue.AMQMessage; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.exchange.topic.TopicParser; import org.apache.qpid.server.exchange.topic.TopicMatcherResult; @@ -48,9 +47,6 @@ import javax.management.openmbean.TabularData; import javax.management.openmbean.TabularDataSupport; import java.util.*; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.CopyOnWriteArraySet; -import java.util.concurrent.atomic.AtomicInteger; import java.lang.ref.WeakReference; public class TopicExchange extends AbstractExchange @@ -532,7 +528,10 @@ public class TopicExchange extends AbstractExchange final AMQShortString routingKey = payload.getRoutingKey(); - Collection<AMQQueue> queues = getMatchedQueues(payload, routingKey); + // The copy here is unfortunate, but not too bad relevant to the amount of + // things created and copied in getMatchedQueues + ArrayList<AMQQueue> queues = new ArrayList<AMQQueue>(); + queues.addAll(getMatchedQueues(payload, routingKey)); if(queues == null || queues.isEmpty()) { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java index 0e5e7aa68c..a485649410 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java @@ -291,12 +291,17 @@ public class AMQMessage implements Filterable<AMQException> return this; } - /** Threadsafe. Increment the reference count on the message. */ public boolean incrementReference() { - if(_referenceCount.incrementAndGet() <= 1) + return incrementReference(1); + } + + /* Threadsafe. Increment the reference count on the message. */ + public boolean incrementReference(int count) + { + if(_referenceCount.addAndGet(count) <= 1) { - _referenceCount.decrementAndGet(); + _referenceCount.addAndGet(-count); return false; } else diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java index 9d769d7582..6b498d4d98 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java @@ -34,6 +34,7 @@ import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.AMQException; import org.apache.log4j.Logger; +import java.util.ArrayList; import java.util.Collection; public class IncomingMessage implements Filterable<RuntimeException> @@ -63,7 +64,7 @@ public class IncomingMessage implements Filterable<RuntimeException> * delivered. It is <b>cleared after delivery has been attempted</b>. Any persistent record of destinations is done * by the message handle. */ - private Collection<AMQQueue> _destinationQueues; + private ArrayList<AMQQueue> _destinationQueues; private AMQProtocolSession _publisher; private MessageStore _messageStore; @@ -134,21 +135,13 @@ public class IncomingMessage implements Filterable<RuntimeException> if(_destinationQueues != null) { - for (AMQQueue q : _destinationQueues) + for (int i = 0; i < _destinationQueues.size(); i++) { - if(q.isDurable()) - { - - _messageStore.enqueueMessage(_txnContext.getStoreContext(), q, _messageId); - } + _messageStore.enqueueMessage(_txnContext.getStoreContext(), + _destinationQueues.get(i), _messageId); } } - } - - - - } public AMQMessage deliverToQueues() @@ -157,10 +150,9 @@ public class IncomingMessage implements Filterable<RuntimeException> // we get a reference to the destination queues now so that we can clear the // transient message data as quickly as possible - Collection<AMQQueue> destinationQueues = _destinationQueues; if (_logger.isDebugEnabled()) { - _logger.debug("Delivering message " + _messageId + " to " + destinationQueues); + _logger.debug("Delivering message " + _messageId + " to " + _destinationQueues); } AMQMessage message = null; @@ -178,10 +170,7 @@ public class IncomingMessage implements Filterable<RuntimeException> message.setExpiration(_expiration); message.setClientIdentifier(_publisher.getSessionIdentifier()); - - - - if ((destinationQueues == null) || destinationQueues.isEmpty()) + if ((_destinationQueues == null) || _destinationQueues.size() == 0) { if (isMandatory() || isImmediate()) @@ -196,10 +185,9 @@ public class IncomingMessage implements Filterable<RuntimeException> } else { - // TODO - int offset; - final int queueCount = destinationQueues.size(); + final int queueCount = _destinationQueues.size(); + message.incrementReference(queueCount); if(queueCount == 1) { offset = 0; @@ -212,33 +200,16 @@ public class IncomingMessage implements Filterable<RuntimeException> offset = -offset; } } - - int i = 0; - for (AMQQueue q : destinationQueues) + for (int i = offset; i < queueCount; i++) { - if(++i > offset) - { - // Increment the references to this message for each queue delivery. - message.incrementReference(); - // normal deliver so add this message at the end. - _txnContext.deliver(q, message); - } + // normal deliver so add this message at the end. + _txnContext.deliver(_destinationQueues.get(i), message); } - i = 0; - if(offset != 0) + for (int i = 0; i < offset; i++) { - for (AMQQueue q : destinationQueues) - { - if(i++ < offset) - { - // Increment the references to this message for each queue delivery. - message.incrementReference(); - // normal deliver so add this message at the end. - _txnContext.deliver(q, message); - } - } + // normal deliver so add this message at the end. + _txnContext.deliver(_destinationQueues.get(i), message); } - } // we then allow the transactional context to do something with the message content @@ -329,7 +300,7 @@ public class IncomingMessage implements Filterable<RuntimeException> _exchange.route(this); } - public void enqueue(final Collection<AMQQueue> queues) + public void enqueue(final ArrayList<AMQQueue> queues) { _destinationQueues = queues; } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/AccessResult.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/AccessResult.java index 89cead69b3..86f155d862 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/AccessResult.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/AccessResult.java @@ -27,23 +27,23 @@ public class AccessResult GRANTED, REFUSED } - StringBuilder _authorizer; - AccessStatus _status; + private String _authorizer; + private AccessStatus _status; public AccessResult(ACLPlugin authorizer, AccessStatus status) { _status = status; - _authorizer = new StringBuilder(authorizer.getPluginName()); + _authorizer = authorizer.getPluginName(); } public void setAuthorizer(ACLPlugin authorizer) { - _authorizer.append(authorizer.getPluginName()); + _authorizer += authorizer.getPluginName(); } public String getAuthorizer() { - return _authorizer.toString(); + return _authorizer; } public void setStatus(AccessStatus status) @@ -58,8 +58,7 @@ public class AccessResult public void addAuthorizer(ACLPlugin accessManager) { - _authorizer.insert(0, "->"); - _authorizer.insert(0, accessManager.getPluginName()); + _authorizer = accessManager.getPluginName() + "->" + _authorizer; } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java index ca614e053a..712d3abc8f 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java @@ -43,6 +43,8 @@ import org.apache.qpid.framing.abstraction.ContentChunk; import org.apache.mina.common.ByteBuffer; import javax.management.Notification; + +import java.util.ArrayList; import java.util.LinkedList; import java.util.Collections; @@ -304,7 +306,9 @@ public class AMQQueueAlertTest extends TestCase for (int i = 0; i < messages.length; i++) { messages[i] = message(false, size); - messages[i].enqueue(Collections.singleton(_queue)); + ArrayList<AMQQueue> qs = new ArrayList<AMQQueue>(); + qs.add(_queue); + messages[i].enqueue(qs); messages[i].routingComplete(_messageStore, new MessageHandleFactory()); } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java index bf0a8a6d90..17f8a751de 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java @@ -47,6 +47,8 @@ import org.apache.qpid.server.store.TestableMemoryMessageStore; import org.apache.mina.common.ByteBuffer; import javax.management.JMException; + +import java.util.ArrayList; import java.util.LinkedList; import java.util.Collections; @@ -216,8 +218,9 @@ public class AMQQueueMBeanTest extends TestCase IncomingMessage msg = message(false, false); long id = msg.getMessageId(); _queue.clearQueue(_storeContext); - - msg.enqueue(Collections.singleton(_queue)); + ArrayList<AMQQueue> qs = new ArrayList<AMQQueue>(); + qs.add(_queue); + msg.enqueue(qs); msg.routingComplete(_messageStore, new MessageHandleFactory()); msg.addContentBodyFrame(new ContentChunk() @@ -319,7 +322,9 @@ public class AMQQueueMBeanTest extends TestCase for (int i = 0; i < messageCount; i++) { IncomingMessage currentMessage = message(false, persistent); - currentMessage.enqueue(Collections.singleton(_queue)); + ArrayList<AMQQueue> qs = new ArrayList<AMQQueue>(); + qs.add(_queue); + currentMessage.enqueue(qs); // route header currentMessage.routingComplete(_messageStore, new MessageHandleFactory()); diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java index bbd6deffd3..afa0f84d71 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java @@ -40,6 +40,7 @@ import org.apache.qpid.server.txn.NonTransactionalContext; import org.apache.qpid.server.txn.TransactionalContext; import org.apache.qpid.server.util.NullApplicationRegistry; +import java.util.ArrayList; import java.util.LinkedList; import java.util.Set; import java.util.Collections; @@ -146,7 +147,9 @@ public class AckTest extends TestCase // we increment the reference here since we are not delivering the messaging to any queues, which is where // the reference is normally incremented. The test is easier to construct if we have direct access to the // subscription - msg.enqueue(Collections.singleton(_queue)); + ArrayList<AMQQueue> qs = new ArrayList<AMQQueue>(); + qs.add(_queue); + msg.enqueue(qs); msg.routingComplete(_messageStore, factory); if(msg.allContentReceived()) { |
