summaryrefslogtreecommitdiff
path: root/qpid/java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java')
-rw-r--r--qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/DiagnosticExchange.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Index.java10
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java9
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java11
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java61
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/AccessResult.java13
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java6
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java11
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java5
11 files changed, 59 insertions, 73 deletions
diff --git a/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/DiagnosticExchange.java b/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/DiagnosticExchange.java
index 0b7e300cec..027d220538 100644
--- a/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/DiagnosticExchange.java
+++ b/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/DiagnosticExchange.java
@@ -204,7 +204,7 @@ public class DiagnosticExchange extends AbstractExchange
((BasicContentHeaderProperties)payload.getContentHeaderBody().properties).setHeaders(headers);
AMQQueue q = getQueueRegistry().getQueue(new AMQShortString("diagnosticqueue"));
- Collection<AMQQueue> queues = new ArrayList<AMQQueue>();
+ ArrayList<AMQQueue> queues = new ArrayList<AMQQueue>();
queues.add(q);
payload.enqueue(queues);
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java
index 4da639567a..616f47bd24 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java
@@ -191,7 +191,7 @@ public class DirectExchange extends AbstractExchange
final AMQShortString routingKey = payload.getRoutingKey() == null ? AMQShortString.EMPTY_STRING : payload.getRoutingKey();
- final List<AMQQueue> queues = (routingKey == null) ? null : _index.get(routingKey);
+ final ArrayList<AMQQueue> queues = (routingKey == null) ? null : _index.get(routingKey);
if (_logger.isDebugEnabled())
{
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
index d1bea3410b..1ee1f35de6 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
@@ -249,7 +249,7 @@ public class HeadersExchange extends AbstractExchange
_logger.debug("Exchange " + getName() + ": routing message with headers " + headers);
}
boolean routed = false;
- Collection<AMQQueue> queues = new ArrayList<AMQQueue>();
+ ArrayList<AMQQueue> queues = new ArrayList<AMQQueue>();
for (Registration e : _bindings)
{
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Index.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Index.java
index 9bf82a3730..ec83161029 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Index.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Index.java
@@ -37,12 +37,12 @@ import org.apache.qpid.server.queue.AMQQueue;
*/
class Index
{
- private ConcurrentMap<AMQShortString, List<AMQQueue>> _index
- = new ConcurrentHashMap<AMQShortString, List<AMQQueue>>();
+ private ConcurrentMap<AMQShortString, ArrayList<AMQQueue>> _index
+ = new ConcurrentHashMap<AMQShortString, ArrayList<AMQQueue>>();
synchronized boolean add(AMQShortString key, AMQQueue queue)
{
- List<AMQQueue> queues = _index.get(key);
+ ArrayList<AMQQueue> queues = _index.get(key);
if(queues == null)
{
queues = new ArrayList<AMQQueue>();
@@ -66,7 +66,7 @@ class Index
synchronized boolean remove(AMQShortString key, AMQQueue queue)
{
- List<AMQQueue> queues = _index.get(key);
+ ArrayList<AMQQueue> queues = _index.get(key);
if (queues != null)
{
queues = new ArrayList<AMQQueue>(queues);
@@ -87,7 +87,7 @@ class Index
return false;
}
- List<AMQQueue> get(AMQShortString key)
+ ArrayList<AMQQueue> get(AMQShortString key)
{
return _index.get(key);
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java
index d07501a188..c18cc337fe 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java
@@ -32,7 +32,6 @@ import org.apache.qpid.server.management.MBeanConstructor;
import org.apache.qpid.server.management.MBeanDescription;
import org.apache.qpid.server.queue.IncomingMessage;
import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.exchange.topic.TopicParser;
import org.apache.qpid.server.exchange.topic.TopicMatcherResult;
@@ -48,9 +47,6 @@ import javax.management.openmbean.TabularData;
import javax.management.openmbean.TabularDataSupport;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.CopyOnWriteArraySet;
-import java.util.concurrent.atomic.AtomicInteger;
import java.lang.ref.WeakReference;
public class TopicExchange extends AbstractExchange
@@ -532,7 +528,10 @@ public class TopicExchange extends AbstractExchange
final AMQShortString routingKey = payload.getRoutingKey();
- Collection<AMQQueue> queues = getMatchedQueues(payload, routingKey);
+ // The copy here is unfortunate, but not too bad relevant to the amount of
+ // things created and copied in getMatchedQueues
+ ArrayList<AMQQueue> queues = new ArrayList<AMQQueue>();
+ queues.addAll(getMatchedQueues(payload, routingKey));
if(queues == null || queues.isEmpty())
{
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
index 0e5e7aa68c..a485649410 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
@@ -291,12 +291,17 @@ public class AMQMessage implements Filterable<AMQException>
return this;
}
- /** Threadsafe. Increment the reference count on the message. */
public boolean incrementReference()
{
- if(_referenceCount.incrementAndGet() <= 1)
+ return incrementReference(1);
+ }
+
+ /* Threadsafe. Increment the reference count on the message. */
+ public boolean incrementReference(int count)
+ {
+ if(_referenceCount.addAndGet(count) <= 1)
{
- _referenceCount.decrementAndGet();
+ _referenceCount.addAndGet(-count);
return false;
}
else
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
index 9d769d7582..6b498d4d98 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
@@ -34,6 +34,7 @@ import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.AMQException;
import org.apache.log4j.Logger;
+import java.util.ArrayList;
import java.util.Collection;
public class IncomingMessage implements Filterable<RuntimeException>
@@ -63,7 +64,7 @@ public class IncomingMessage implements Filterable<RuntimeException>
* delivered. It is <b>cleared after delivery has been attempted</b>. Any persistent record of destinations is done
* by the message handle.
*/
- private Collection<AMQQueue> _destinationQueues;
+ private ArrayList<AMQQueue> _destinationQueues;
private AMQProtocolSession _publisher;
private MessageStore _messageStore;
@@ -134,21 +135,13 @@ public class IncomingMessage implements Filterable<RuntimeException>
if(_destinationQueues != null)
{
- for (AMQQueue q : _destinationQueues)
+ for (int i = 0; i < _destinationQueues.size(); i++)
{
- if(q.isDurable())
- {
-
- _messageStore.enqueueMessage(_txnContext.getStoreContext(), q, _messageId);
- }
+ _messageStore.enqueueMessage(_txnContext.getStoreContext(),
+ _destinationQueues.get(i), _messageId);
}
}
-
}
-
-
-
-
}
public AMQMessage deliverToQueues()
@@ -157,10 +150,9 @@ public class IncomingMessage implements Filterable<RuntimeException>
// we get a reference to the destination queues now so that we can clear the
// transient message data as quickly as possible
- Collection<AMQQueue> destinationQueues = _destinationQueues;
if (_logger.isDebugEnabled())
{
- _logger.debug("Delivering message " + _messageId + " to " + destinationQueues);
+ _logger.debug("Delivering message " + _messageId + " to " + _destinationQueues);
}
AMQMessage message = null;
@@ -178,10 +170,7 @@ public class IncomingMessage implements Filterable<RuntimeException>
message.setExpiration(_expiration);
message.setClientIdentifier(_publisher.getSessionIdentifier());
-
-
-
- if ((destinationQueues == null) || destinationQueues.isEmpty())
+ if ((_destinationQueues == null) || _destinationQueues.size() == 0)
{
if (isMandatory() || isImmediate())
@@ -196,10 +185,9 @@ public class IncomingMessage implements Filterable<RuntimeException>
}
else
{
- // TODO
-
int offset;
- final int queueCount = destinationQueues.size();
+ final int queueCount = _destinationQueues.size();
+ message.incrementReference(queueCount);
if(queueCount == 1)
{
offset = 0;
@@ -212,33 +200,16 @@ public class IncomingMessage implements Filterable<RuntimeException>
offset = -offset;
}
}
-
- int i = 0;
- for (AMQQueue q : destinationQueues)
+ for (int i = offset; i < queueCount; i++)
{
- if(++i > offset)
- {
- // Increment the references to this message for each queue delivery.
- message.incrementReference();
- // normal deliver so add this message at the end.
- _txnContext.deliver(q, message);
- }
+ // normal deliver so add this message at the end.
+ _txnContext.deliver(_destinationQueues.get(i), message);
}
- i = 0;
- if(offset != 0)
+ for (int i = 0; i < offset; i++)
{
- for (AMQQueue q : destinationQueues)
- {
- if(i++ < offset)
- {
- // Increment the references to this message for each queue delivery.
- message.incrementReference();
- // normal deliver so add this message at the end.
- _txnContext.deliver(q, message);
- }
- }
+ // normal deliver so add this message at the end.
+ _txnContext.deliver(_destinationQueues.get(i), message);
}
-
}
// we then allow the transactional context to do something with the message content
@@ -329,7 +300,7 @@ public class IncomingMessage implements Filterable<RuntimeException>
_exchange.route(this);
}
- public void enqueue(final Collection<AMQQueue> queues)
+ public void enqueue(final ArrayList<AMQQueue> queues)
{
_destinationQueues = queues;
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/AccessResult.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/AccessResult.java
index 89cead69b3..86f155d862 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/AccessResult.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/AccessResult.java
@@ -27,23 +27,23 @@ public class AccessResult
GRANTED, REFUSED
}
- StringBuilder _authorizer;
- AccessStatus _status;
+ private String _authorizer;
+ private AccessStatus _status;
public AccessResult(ACLPlugin authorizer, AccessStatus status)
{
_status = status;
- _authorizer = new StringBuilder(authorizer.getPluginName());
+ _authorizer = authorizer.getPluginName();
}
public void setAuthorizer(ACLPlugin authorizer)
{
- _authorizer.append(authorizer.getPluginName());
+ _authorizer += authorizer.getPluginName();
}
public String getAuthorizer()
{
- return _authorizer.toString();
+ return _authorizer;
}
public void setStatus(AccessStatus status)
@@ -58,8 +58,7 @@ public class AccessResult
public void addAuthorizer(ACLPlugin accessManager)
{
- _authorizer.insert(0, "->");
- _authorizer.insert(0, accessManager.getPluginName());
+ _authorizer = accessManager.getPluginName() + "->" + _authorizer;
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
index ca614e053a..712d3abc8f 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
@@ -43,6 +43,8 @@ import org.apache.qpid.framing.abstraction.ContentChunk;
import org.apache.mina.common.ByteBuffer;
import javax.management.Notification;
+
+import java.util.ArrayList;
import java.util.LinkedList;
import java.util.Collections;
@@ -304,7 +306,9 @@ public class AMQQueueAlertTest extends TestCase
for (int i = 0; i < messages.length; i++)
{
messages[i] = message(false, size);
- messages[i].enqueue(Collections.singleton(_queue));
+ ArrayList<AMQQueue> qs = new ArrayList<AMQQueue>();
+ qs.add(_queue);
+ messages[i].enqueue(qs);
messages[i].routingComplete(_messageStore, new MessageHandleFactory());
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
index bf0a8a6d90..17f8a751de 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
@@ -47,6 +47,8 @@ import org.apache.qpid.server.store.TestableMemoryMessageStore;
import org.apache.mina.common.ByteBuffer;
import javax.management.JMException;
+
+import java.util.ArrayList;
import java.util.LinkedList;
import java.util.Collections;
@@ -216,8 +218,9 @@ public class AMQQueueMBeanTest extends TestCase
IncomingMessage msg = message(false, false);
long id = msg.getMessageId();
_queue.clearQueue(_storeContext);
-
- msg.enqueue(Collections.singleton(_queue));
+ ArrayList<AMQQueue> qs = new ArrayList<AMQQueue>();
+ qs.add(_queue);
+ msg.enqueue(qs);
msg.routingComplete(_messageStore, new MessageHandleFactory());
msg.addContentBodyFrame(new ContentChunk()
@@ -319,7 +322,9 @@ public class AMQQueueMBeanTest extends TestCase
for (int i = 0; i < messageCount; i++)
{
IncomingMessage currentMessage = message(false, persistent);
- currentMessage.enqueue(Collections.singleton(_queue));
+ ArrayList<AMQQueue> qs = new ArrayList<AMQQueue>();
+ qs.add(_queue);
+ currentMessage.enqueue(qs);
// route header
currentMessage.routingComplete(_messageStore, new MessageHandleFactory());
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java
index bbd6deffd3..afa0f84d71 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java
@@ -40,6 +40,7 @@ import org.apache.qpid.server.txn.NonTransactionalContext;
import org.apache.qpid.server.txn.TransactionalContext;
import org.apache.qpid.server.util.NullApplicationRegistry;
+import java.util.ArrayList;
import java.util.LinkedList;
import java.util.Set;
import java.util.Collections;
@@ -146,7 +147,9 @@ public class AckTest extends TestCase
// we increment the reference here since we are not delivering the messaging to any queues, which is where
// the reference is normally incremented. The test is easier to construct if we have direct access to the
// subscription
- msg.enqueue(Collections.singleton(_queue));
+ ArrayList<AMQQueue> qs = new ArrayList<AMQQueue>();
+ qs.add(_queue);
+ msg.enqueue(qs);
msg.routingComplete(_messageStore, factory);
if(msg.allContentReceived())
{