summaryrefslogtreecommitdiff
path: root/qpid/java
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2014-02-01 15:40:47 +0000
committerRobert Godfrey <rgodfrey@apache.org>2014-02-01 15:40:47 +0000
commit6823d23dbeca328f4e860538a52015bc9313a6db (patch)
tree8af7823f6e4ac169835909ed6babf38d4ad42e57 /qpid/java
parentade50f17b8ffea099f8fffaaf283b2412f393bce (diff)
downloadqpid-python-6823d23dbeca328f4e860538a52015bc9313a6db.tar.gz
QPID-5504 : Moving routing to Exchange from session classes
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1563431 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java63
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java62
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/Exchange.java17
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java27
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java67
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java88
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java5
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java5
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java18
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java25
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Subscription_0_10.java42
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java170
-rw-r--r--qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java48
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java24
14 files changed, 295 insertions, 366 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
index b00d98637e..6a959df440 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
@@ -33,12 +33,14 @@ import org.apache.qpid.server.logging.messages.ExchangeMessages;
import org.apache.qpid.server.logging.subjects.BindingLogSubject;
import org.apache.qpid.server.logging.subjects.ExchangeLogSubject;
import org.apache.qpid.server.message.InstanceProperties;
+import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.plugin.ExchangeType;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.BaseQueue;
import org.apache.qpid.server.store.DurableConfigurationStoreHelper;
+import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.virtualhost.VirtualHost;
import java.util.Collection;
@@ -374,9 +376,9 @@ public abstract class AbstractExchange implements Exchange
return getBindings().size();
}
- @Override
- public final List<? extends BaseQueue> route(final ServerMessage message,
- final InstanceProperties instanceProperties)
+
+ final List<? extends BaseQueue> route(final ServerMessage message,
+ final InstanceProperties instanceProperties)
{
_receivedMessageCount.incrementAndGet();
_receivedMessageSize.addAndGet(message.getSize());
@@ -416,6 +418,59 @@ public abstract class AbstractExchange implements Exchange
return queues;
}
+ public final int send(final ServerMessage message,
+ final InstanceProperties instanceProperties,
+ final ServerTransaction txn,
+ final BaseQueue.PostEnqueueAction postEnqueueAction)
+ {
+ List<? extends BaseQueue> queues = route(message, instanceProperties);
+
+ if(queues == null || queues.isEmpty())
+ {
+ Exchange altExchange = getAlternateExchange();
+ if(altExchange != null)
+ {
+ return altExchange.send(message, instanceProperties, txn, postEnqueueAction);
+ }
+ else
+ {
+ return 0;
+ }
+ }
+ else
+ {
+ final BaseQueue[] baseQueues = queues.toArray(new BaseQueue[queues.size()]);
+
+ txn.enqueue(queues,message, new ServerTransaction.Action()
+ {
+ MessageReference _reference = message.newReference();
+
+ public void postCommit()
+ {
+ for(int i = 0; i < baseQueues.length; i++)
+ {
+ try
+ {
+ baseQueues[i].enqueue(message, postEnqueueAction);
+ }
+ catch (AMQException e)
+ {
+ // TODO
+ throw new RuntimeException(e);
+ }
+ }
+ _reference.release();
+ }
+
+ public void onRollback()
+ {
+ _reference.release();
+ }
+ });
+ return queues.size();
+ }
+ }
+
protected abstract List<? extends BaseQueue> doRoute(final ServerMessage message,
final InstanceProperties instanceProperties);
@@ -679,4 +734,6 @@ public abstract class AbstractExchange implements Exchange
public void onClose(Exchange exchange) throws AMQSecurityException, AMQInternalException;
}
+
+
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java
index e2582019cd..71d0f8b4dd 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java
@@ -36,11 +36,14 @@ import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.messages.ExchangeMessages;
import org.apache.qpid.server.message.InstanceProperties;
+import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.plugin.ExchangeType;
import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.BaseQueue;
import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.virtualhost.VirtualHost;
public class DefaultExchange implements Exchange
@@ -204,22 +207,6 @@ public class DefaultExchange implements Exchange
}
@Override
- public List<AMQQueue> route(ServerMessage message, final InstanceProperties instanceProperties)
- {
- AMQQueue q = _virtualHost.getQueue(message.getRoutingKey());
- if(q == null)
- {
- List<AMQQueue> noQueues = Collections.emptyList();
- return noQueues;
- }
- else
- {
- return Collections.singletonList(q);
- }
-
- }
-
- @Override
public boolean isBound(AMQQueue queue)
{
return _virtualHost.getQueue(queue.getName()) == queue;
@@ -343,4 +330,47 @@ public class DefaultExchange implements Exchange
{
return _id;
}
+
+ public final int send(final ServerMessage message,
+ final InstanceProperties instanceProperties,
+ final ServerTransaction txn,
+ final BaseQueue.PostEnqueueAction postEnqueueAction)
+ {
+ final AMQQueue q = _virtualHost.getQueue(message.getRoutingKey());
+ if(q == null)
+ {
+ return 0;
+ }
+ else
+ {
+ txn.enqueue(q,message, new ServerTransaction.Action()
+ {
+ MessageReference _reference = message.newReference();
+
+ public void postCommit()
+ {
+ try
+ {
+ q.enqueue(message, postEnqueueAction);
+ }
+ catch (AMQException e)
+ {
+ // TODO
+ throw new RuntimeException(e);
+ }
+ finally
+ {
+ _reference.release();
+ }
+ }
+
+ public void onRollback()
+ {
+ _reference.release();
+ }
+ });
+ return 1;
+ }
+ }
+
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/Exchange.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/Exchange.java
index 78455c9261..18e912e972 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/Exchange.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/Exchange.java
@@ -29,6 +29,7 @@ import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.plugin.ExchangeType;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.BaseQueue;
+import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.virtualhost.VirtualHost;
import java.util.Collection;
@@ -94,13 +95,17 @@ public interface Exchange extends ExchangeReferrer
void close() throws AMQException;
/**
- * Returns a list of queues to which to route this message. If there are
- * no queues the empty list must be returned.
- *
- * @return list of queues to which to route the message.
+ * Routes a message
+ * @param message the message to be routed
+ * @param instanceProperties the instance properties
+ * @param txn the transaction to enqueue within
+ * @param postEnqueueAction action to perform on the result of every enqueue (may be null)
+ * @return the number of queues in which the message was enqueued performed
*/
- List<? extends BaseQueue> route(ServerMessage message, final InstanceProperties instanceProperties);
-
+ int send(ServerMessage message,
+ InstanceProperties instanceProperties,
+ ServerTransaction txn,
+ BaseQueue.PostEnqueueAction postEnqueueAction);
/**
* Determines whether a message would be isBound to a particular queue using a specific routing key and arguments
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
index 80ccbe1649..2aa1d1f473 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
@@ -22,11 +22,11 @@ package org.apache.qpid.server.queue;
import org.apache.qpid.AMQException;
import org.apache.qpid.server.filter.Filterable;
-import org.apache.qpid.server.message.InstanceProperties;
-import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.txn.ServerTransaction;
-public interface QueueEntry extends Comparable<QueueEntry>
+public interface QueueEntry extends MessageInstance, Comparable<QueueEntry>
{
@@ -177,26 +177,17 @@ public interface QueueEntry extends Comparable<QueueEntry>
AMQQueue getQueue();
- ServerMessage getMessage();
-
long getSize();
boolean getDeliveredToConsumer();
boolean expired() throws AMQException;
- boolean isAvailable();
-
- boolean isAcquired();
-
- boolean acquire();
boolean acquire(Subscription sub);
boolean acquiredBySubscription();
boolean isAcquiredBy(Subscription subscription);
- void release();
-
void setRedelivered();
boolean isRedelivered();
@@ -207,16 +198,7 @@ public interface QueueEntry extends Comparable<QueueEntry>
boolean isRejectedBy(long subscriptionId);
- void delete();
-
- /**
- * Returns true if entry is either DEQUED or DELETED state.
- *
- * @return true if entry is either DEQUED or DELETED state
- */
- boolean isDeleted();
-
- void routeToAlternate();
+ int routeToAlternate(final BaseQueue.PostEnqueueAction action, ServerTransaction txn);
boolean isQueueDeleted();
@@ -241,5 +223,4 @@ public interface QueueEntry extends Comparable<QueueEntry>
Filterable asFilterable();
- InstanceProperties getInstanceProperties();
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
index ed61f1acf6..461d493437 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
@@ -34,7 +34,6 @@ import org.apache.qpid.server.txn.ServerTransaction;
import java.util.EnumMap;
import java.util.HashSet;
-import java.util.List;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
@@ -250,7 +249,7 @@ public abstract class QueueEntryImpl implements QueueEntry
}
else if(acquire())
{
- routeToAlternate();
+ routeToAlternate(null, null);
}
}
@@ -368,65 +367,43 @@ public abstract class QueueEntryImpl implements QueueEntry
dispose();
}
- public void routeToAlternate()
+ public int routeToAlternate(final BaseQueue.PostEnqueueAction action, ServerTransaction txn)
{
final AMQQueue currentQueue = getQueue();
Exchange alternateExchange = currentQueue.getAlternateExchange();
-
+ boolean autocommit = txn == null;
if (alternateExchange != null)
{
- List<? extends BaseQueue> queues = alternateExchange.route(getMessage(), getInstanceProperties());
- final ServerMessage message = getMessage();
- if ((queues == null || queues.size() == 0) && alternateExchange.getAlternateExchange() != null)
+ if(autocommit)
{
- queues = alternateExchange.getAlternateExchange().route(getMessage(), getInstanceProperties());
+ txn = new LocalTransaction(getQueue().getVirtualHost().getMessageStore());
}
+ int enqueues = alternateExchange.send(getMessage(), getInstanceProperties(), txn, action);
-
- if (queues != null && queues.size() != 0)
+ txn.dequeue(currentQueue, getMessage(), new ServerTransaction.Action()
{
- final List<? extends BaseQueue> rerouteQueues = queues;
- ServerTransaction txn = new LocalTransaction(getQueue().getVirtualHost().getMessageStore());
-
- txn.enqueue(rerouteQueues, message, new ServerTransaction.Action()
+ public void postCommit()
{
- public void postCommit()
- {
- try
- {
- for (BaseQueue queue : rerouteQueues)
- {
- queue.enqueue(message);
- }
- }
- catch (AMQException e)
- {
- throw new RuntimeException(e);
- }
- }
-
- public void onRollback()
- {
-
- }
- });
-
- txn.dequeue(currentQueue, message, new ServerTransaction.Action()
- {
- public void postCommit()
- {
- delete();
- }
+ delete();
+ }
- public void onRollback()
- {
+ public void onRollback()
+ {
- }
- });
+ }
+ });
+ if(autocommit)
+ {
txn.commit();
}
+ return enqueues;
+
+ }
+ else
+ {
+ return 0;
}
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
index d63d1946d3..87d11a892e 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
@@ -1355,93 +1355,25 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
ServerTransaction txn = new LocalTransaction(getVirtualHost().getMessageStore());
- if(_alternateExchange != null)
+
+ for(final QueueEntry entry : entries)
{
+ // TODO log requeues with a post enqueue action
+ int requeues = entry.routeToAlternate(null, txn);
- for(final QueueEntry entry : entries)
+ if(requeues == 0)
{
-
- List<? extends BaseQueue> queues = _alternateExchange.route(entry.getMessage(), entry.getInstanceProperties());
- if((queues == null || queues.size() == 0) && _alternateExchange.getAlternateExchange() != null)
- {
- queues = _alternateExchange.getAlternateExchange().route(entry.getMessage(), entry.getInstanceProperties());
- }
-
- final ServerMessage message = entry.getMessage();
- if(queues != null && queues.size() != 0)
- {
- final List<? extends BaseQueue> rerouteQueues = queues;
- txn.enqueue(rerouteQueues, entry.getMessage(),
- new ServerTransaction.Action()
- {
-
- public void postCommit()
- {
- try
- {
- for(BaseQueue queue : rerouteQueues)
- {
- queue.enqueue(message);
- }
- }
- catch (AMQException e)
- {
- throw new RuntimeException(e);
- }
-
- }
-
- public void onRollback()
- {
-
- }
- });
- txn.dequeue(this, entry.getMessage(),
- new ServerTransaction.Action()
- {
-
- public void postCommit()
- {
- entry.delete();
- }
-
- public void onRollback()
- {
- }
- });
- }
-
+ // TODO log discard
}
-
- _alternateExchange.removeReference(this);
}
- else
- {
- // TODO log discard
-
- for(final QueueEntry entry : entries)
- {
- final ServerMessage message = entry.getMessage();
- if(message != null)
- {
- txn.dequeue(this, message,
- new ServerTransaction.Action()
- {
- public void postCommit()
- {
- entry.delete();
- }
+ txn.commit();
- public void onRollback()
- {
- }
- });
- }
- }
+ if(_alternateExchange != null)
+ {
+ _alternateExchange.removeReference(this);
}
- txn.commit();
for (Task task : _deleteTaskList)
{
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java
index 7bd525c90f..764549626a 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java
@@ -312,10 +312,9 @@ public class TopicExchangeTest extends QpidTestCase
private int routeMessage(String routingKey, long messageNumber) throws AMQException
{
- ServerMessage serverMessage = mock(ServerMessage.class);
- when(serverMessage.getRoutingKey()).thenReturn(routingKey);
- List<? extends BaseQueue> queues = _exchange.route(serverMessage, InstanceProperties.EMPTY);
ServerMessage message = mock(ServerMessage.class);
+ when(message.getRoutingKey()).thenReturn(routingKey);
+ List<? extends BaseQueue> queues = _exchange.route(message, InstanceProperties.EMPTY);
MessageReference ref = mock(MessageReference.class);
when(ref.getMessage()).thenReturn(message);
when(message.newReference()).thenReturn(ref);
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java
index 2e3231e208..d3c866f747 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java
@@ -26,6 +26,7 @@ import org.apache.qpid.server.message.AMQMessageHeader;
import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.txn.ServerTransaction;
public class MockQueueEntry implements QueueEntry
{
@@ -62,9 +63,9 @@ public class MockQueueEntry implements QueueEntry
}
- public void routeToAlternate()
+ public int routeToAlternate(final BaseQueue.PostEnqueueAction action, final ServerTransaction txn)
{
-
+ return 0;
}
public boolean expired() throws AMQException
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
index fe82f65115..bae5616042 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
@@ -46,6 +46,7 @@ import org.apache.qpid.AMQStoreException;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.TransactionTimeoutHelper;
import org.apache.qpid.server.TransactionTimeoutHelper.CloseAction;
+import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.logging.LogActor;
import org.apache.qpid.server.logging.LogMessage;
import org.apache.qpid.server.logging.LogSubject;
@@ -53,6 +54,7 @@ import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.actors.GenericActor;
import org.apache.qpid.server.logging.messages.ChannelMessages;
import org.apache.qpid.server.logging.subjects.ChannelLogSubject;
+import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.protocol.AMQSessionModel;
@@ -102,6 +104,14 @@ public class ServerSession extends Session
private final AtomicBoolean _blocking = new AtomicBoolean(false);
private ChannelLogSubject _logSubject;
private final AtomicInteger _outstandingCredit = new AtomicInteger(UNLIMITED_CREDIT);
+ private final BaseQueue.PostEnqueueAction _checkCapacityAction = new BaseQueue.PostEnqueueAction()
+ {
+ @Override
+ public void onEnqueue(final QueueEntry entry)
+ {
+ entry.getQueue().checkCapacity(ServerSession.this);
+ }
+ };
public static interface MessageDispositionChangeListener
{
@@ -182,7 +192,9 @@ public class ServerSession extends Session
return isCommandsFull(id);
}
- public void enqueue(final MessageTransferMessage message, final List<? extends BaseQueue> queues)
+ public int enqueue(final MessageTransferMessage message,
+ final InstanceProperties instanceProperties,
+ final Exchange exchange)
{
if(_outstandingCredit.get() != UNLIMITED_CREDIT
&& _outstandingCredit.decrementAndGet() == (Integer.MAX_VALUE - PRODUCER_CREDIT_TOPUP_THRESHOLD))
@@ -190,10 +202,10 @@ public class ServerSession extends Session
_outstandingCredit.addAndGet(PRODUCER_CREDIT_TOPUP_THRESHOLD);
invoke(new MessageFlow("",MessageCreditUnit.MESSAGE, PRODUCER_CREDIT_TOPUP_THRESHOLD));
}
+ int enqueues = exchange.send(message, instanceProperties, _transaction, _checkCapacityAction);
getConnectionModel().registerMessageReceived(message.getSize(), message.getArrivalTime());
- PostEnqueueAction postTransactionAction = new PostEnqueueAction(queues, message, isTransactional()) ;
- _transaction.enqueue(queues,message, postTransactionAction);
incrementOutstandingTxnsIfNecessary();
+ return enqueues;
}
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
index 973f706e0a..dcca696529 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
@@ -39,7 +39,6 @@ import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.plugin.ExchangeType;
import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.BaseQueue;
import org.apache.qpid.server.queue.QueueArgumentsConverter;
import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.store.DurableConfigurationStore;
@@ -337,28 +336,10 @@ public class ServerSessionDelegate extends SessionDelegate
}
};
- List<? extends BaseQueue> queues = exchange.route(message, instanceProperties);
- if(queues.isEmpty() && exchange.getAlternateExchange() != null)
- {
- final Exchange alternateExchange = exchange.getAlternateExchange();
- queues = alternateExchange.route(message, instanceProperties);
- if (!queues.isEmpty())
- {
- exchangeInUse = alternateExchange;
- }
- else
- {
- exchangeInUse = exchange;
- }
- }
- else
- {
- exchangeInUse = exchange;
- }
+ int enqueues = serverSession.enqueue(message, instanceProperties, exchange);
- if(!queues.isEmpty())
+ if(enqueues != 0)
{
- serverSession.enqueue(message, queues);
storeMessage.flushToStore();
}
else
@@ -372,7 +353,7 @@ public class ServerSessionDelegate extends SessionDelegate
}
else
{
- serverSession.getLogActor().message(ExchangeMessages.DISCARDMSG(exchangeInUse.getName(), messageMetaData.getRoutingKey()));
+ serverSession.getLogActor().message(ExchangeMessages.DISCARDMSG(exchange.getName(), messageMetaData.getRoutingKey()));
}
}
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Subscription_0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Subscription_0_10.java
index 17d0e5cb64..357b565365 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Subscription_0_10.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Subscription_0_10.java
@@ -59,7 +59,6 @@ import java.text.MessageFormat;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -544,7 +543,7 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr
void reject(final QueueEntry entry)
{
entry.setRedelivered();
- entry.routeToAlternate();
+ entry.routeToAlternate(null, null);
if(entry.isAcquiredBy(this))
{
entry.delete();
@@ -575,35 +574,36 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr
protected void sendToDLQOrDiscard(QueueEntry entry)
{
- final Exchange alternateExchange = entry.getQueue().getAlternateExchange();
final LogActor logActor = CurrentActor.get();
final ServerMessage msg = entry.getMessage();
- if (alternateExchange != null)
+
+ int requeues = entry.routeToAlternate(new BaseQueue.PostEnqueueAction()
+ {
+ @Override
+ public void onEnqueue(final QueueEntry requeueEntry)
+ {
+ logActor.message( ChannelMessages.DEADLETTERMSG(msg.getMessageNumber(),
+ requeueEntry.getQueue().getName()));
+ }
+ }, null);
+
+ if (requeues == 0)
{
- final List<? extends BaseQueue> destinationQueues = alternateExchange.route(entry.getMessage(), entry.getInstanceProperties());
+ final AMQQueue queue = entry.getQueue();
+ final Exchange alternateExchange = queue.getAlternateExchange();
- if (destinationQueues == null || destinationQueues.isEmpty())
+ if(alternateExchange != null)
{
- entry.delete();
-
- logActor.message( ChannelMessages.DISCARDMSG_NOROUTE(msg.getMessageNumber(), alternateExchange.getName()));
+ logActor.message( ChannelMessages.DISCARDMSG_NOROUTE(msg.getMessageNumber(),
+ alternateExchange.getName()));
}
else
{
- entry.routeToAlternate();
-
- //output operational logging for each delivery post commit
- for (final BaseQueue destinationQueue : destinationQueues)
- {
- logActor.message( ChannelMessages.DEADLETTERMSG(msg.getMessageNumber(), destinationQueue.getName()));
- }
+ logActor.message(ChannelMessages.DISCARDMSG_NOALTEXCH(msg.getMessageNumber(),
+ queue.getName(),
+ msg.getRoutingKey()));
}
}
- else
- {
- entry.delete();
- logActor.message(ChannelMessages.DISCARDMSG_NOALTEXCH(msg.getMessageNumber(), entry.getQueue().getName(), msg.getRoutingKey()));
- }
}
private boolean isMaxDeliveryLimitReached(QueueEntry entry)
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
index b7dc105cb7..c6d4151628 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
@@ -165,6 +165,11 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
private final TransactionTimeoutHelper _transactionTimeoutHelper;
private final UUID _id = UUID.randomUUID();
+
+ private final CapacityCheckAction _capacityCheckAction = new CapacityCheckAction();
+ private final ImmediateAction _immediateAction = new ImmediateAction();
+
+
public AMQChannel(AMQProtocolSession session, int channelId, MessageStore messageStore)
throws AMQException
{
@@ -330,6 +335,8 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
}
else
{
+ final boolean immediate = _currentMessage.getMessagePublishInfo().isImmediate();
+
final InstanceProperties instanceProperties =
new InstanceProperties()
{
@@ -341,7 +348,7 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
case EXPIRATION:
return amqMessage.getExpiration();
case IMMEDIATE:
- return _currentMessage.getMessagePublishInfo().isImmediate();
+ return immediate;
case PERSISTENT:
return amqMessage.isPersistent();
case MANDATORY:
@@ -353,21 +360,16 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
}
};
- final List<? extends BaseQueue> destinationQueues =
- _currentMessage.getExchange().route(amqMessage, instanceProperties);
-
- if(destinationQueues == null || destinationQueues.isEmpty())
+ int enqueues = _currentMessage.getExchange().send(amqMessage, instanceProperties, _transaction,
+ immediate ? _immediateAction : _capacityCheckAction);
+ if(enqueues == 0)
{
handleUnroutableMessage(amqMessage);
}
else
{
- _transaction.enqueue(destinationQueues,
- amqMessage,
- new MessageDeliveryAction(amqMessage, destinationQueues));
incrementOutstandingTxnsIfNecessary();
handle.flushToStore();
-
}
}
}
@@ -1258,7 +1260,7 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
if(immediate)
{
- action = new ImmediateAction(queue);
+ action = new ImmediateAction();
}
else
{
@@ -1291,58 +1293,72 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
_reference.release();
}
- private class ImmediateAction implements BaseQueue.PostEnqueueAction
+
+ }
+ private class ImmediateAction implements BaseQueue.PostEnqueueAction
+ {
+
+ public ImmediateAction()
{
- private final BaseQueue _queue;
+ }
- public ImmediateAction(BaseQueue queue)
- {
- _queue = queue;
- }
+ public void onEnqueue(QueueEntry entry)
+ {
+ AMQQueue queue = entry.getQueue();
- public void onEnqueue(QueueEntry entry)
+ if (!entry.getDeliveredToConsumer() && entry.acquire())
{
- if (!entry.getDeliveredToConsumer() && entry.acquire())
- {
-
- ServerTransaction txn = new LocalTransaction(_messageStore);
- Collection<QueueEntry> entries = new ArrayList<QueueEntry>(1);
- entries.add(entry);
- final AMQMessage message = (AMQMessage) entry.getMessage();
- txn.dequeue(_queue, entry.getMessage(),
- new MessageAcknowledgeAction(entries)
+ ServerTransaction txn = new LocalTransaction(_messageStore);
+ Collection<QueueEntry> entries = new ArrayList<QueueEntry>(1);
+ entries.add(entry);
+ final AMQMessage message = (AMQMessage) entry.getMessage();
+ txn.dequeue(queue, entry.getMessage(),
+ new MessageAcknowledgeAction(entries)
+ {
+ @Override
+ public void postCommit()
{
- @Override
- public void postCommit()
+ try
{
- try
- {
- final
- ProtocolOutputConverter outputConverter =
- _session.getProtocolOutputConverter();
-
- outputConverter.writeReturn(message.getMessagePublishInfo(),
- message.getContentHeaderBody(),
- message,
- _channelId,
- AMQConstant.NO_CONSUMERS.getCode(),
- IMMEDIATE_DELIVERY_REPLY_TEXT);
- }
- catch (AMQException e)
- {
- throw new RuntimeException(e);
- }
- super.postCommit();
+ final
+ ProtocolOutputConverter outputConverter =
+ _session.getProtocolOutputConverter();
+
+ outputConverter.writeReturn(message.getMessagePublishInfo(),
+ message.getContentHeaderBody(),
+ message,
+ _channelId,
+ AMQConstant.NO_CONSUMERS.getCode(),
+ IMMEDIATE_DELIVERY_REPLY_TEXT);
+ }
+ catch (AMQException e)
+ {
+ throw new RuntimeException(e);
}
+ super.postCommit();
}
- );
- txn.commit();
-
+ }
+ );
+ txn.commit();
- }
}
+ else
+ {
+ queue.checkCapacity(AMQChannel.this);
+ }
+
+ }
+ }
+
+ private final class CapacityCheckAction implements BaseQueue.PostEnqueueAction
+ {
+ @Override
+ public void onEnqueue(final QueueEntry entry)
+ {
+ AMQQueue queue = entry.getQueue();
+ queue.checkCapacity(AMQChannel.this);
}
}
@@ -1550,48 +1566,46 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
public void deadLetter(long deliveryTag) throws AMQException
{
final UnacknowledgedMessageMap unackedMap = getUnacknowledgedMessageMap();
- final QueueEntry rejectedQueueEntry = unackedMap.get(deliveryTag);
+ final QueueEntry rejectedQueueEntry = unackedMap.remove(deliveryTag);
if (rejectedQueueEntry == null)
{
_logger.warn("No message found, unable to DLQ delivery tag: " + deliveryTag);
- return;
}
else
{
final ServerMessage msg = rejectedQueueEntry.getMessage();
- final AMQQueue queue = rejectedQueueEntry.getQueue();
-
- final Exchange altExchange = queue.getAlternateExchange();
- unackedMap.remove(deliveryTag);
+ int requeues = rejectedQueueEntry.routeToAlternate(new BaseQueue.PostEnqueueAction()
+ {
+ @Override
+ public void onEnqueue(final QueueEntry requeueEntry)
+ {
+ _actor.message( _logSubject, ChannelMessages.DEADLETTERMSG(msg.getMessageNumber(),
+ requeueEntry.getQueue().getName()));
+ }
+ }, null);
- if (altExchange == null)
+ if(requeues == 0)
{
- _logger.debug("No alternate exchange configured for queue, must discard the message as unable to DLQ: delivery tag: " + deliveryTag);
- _actor.message(_logSubject, ChannelMessages.DISCARDMSG_NOALTEXCH(msg.getMessageNumber(), queue.getName(), msg.getRoutingKey()));
- rejectedQueueEntry.delete();
- return;
- }
+ final AMQQueue queue = rejectedQueueEntry.getQueue();
+ final Exchange altExchange = queue.getAlternateExchange();
- final List<? extends BaseQueue> destinationQueues =
- altExchange.route(rejectedQueueEntry.getMessage(), rejectedQueueEntry.getInstanceProperties());
-
- if (destinationQueues == null || destinationQueues.isEmpty())
- {
- _logger.debug("Routing process provided no queues to enqueue the message on, must discard message as unable to DLQ: delivery tag: " + deliveryTag);
- _actor.message(_logSubject, ChannelMessages.DISCARDMSG_NOROUTE(msg.getMessageNumber(), altExchange.getName()));
- rejectedQueueEntry.delete();
- return;
- }
-
- rejectedQueueEntry.routeToAlternate();
+ if (altExchange == null)
+ {
+ _logger.debug("No alternate exchange configured for queue, must discard the message as unable to DLQ: delivery tag: " + deliveryTag);
+ _actor.message(_logSubject, ChannelMessages.DISCARDMSG_NOALTEXCH(msg.getMessageNumber(), queue.getName(), msg.getRoutingKey()));
- //output operational logging for each delivery post commit
- for (final BaseQueue destinationQueue : destinationQueues)
- {
- _actor.message(_logSubject, ChannelMessages.DEADLETTERMSG(msg.getMessageNumber(), destinationQueue.getName()));
+ }
+ else
+ {
+ _logger.debug(
+ "Routing process provided no queues to enqueue the message on, must discard message as unable to DLQ: delivery tag: "
+ + deliveryTag);
+ _actor.message(_logSubject,
+ ChannelMessages.DISCARDMSG_NOROUTE(msg.getMessageNumber(), altExchange.getName()));
+ }
}
}
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java
index 3b981b46b8..3d030890e0 100644
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java
@@ -24,6 +24,7 @@ import java.util.List;
import org.apache.qpid.AMQException;
import org.apache.qpid.amqp_1_0.type.Outcome;
import org.apache.qpid.amqp_1_0.type.messaging.Accepted;
+import org.apache.qpid.amqp_1_0.type.messaging.Rejected;
import org.apache.qpid.amqp_1_0.type.messaging.TerminusDurability;
import org.apache.qpid.amqp_1_0.type.messaging.TerminusExpiryPolicy;
import org.apache.qpid.server.exchange.Exchange;
@@ -35,7 +36,8 @@ import org.apache.qpid.server.txn.ServerTransaction;
public class ExchangeDestination implements ReceivingDestination, SendingDestination
{
private static final Accepted ACCEPTED = new Accepted();
- private static final Outcome[] OUTCOMES = { ACCEPTED };
+ public static final Rejected REJECTED = new Rejected();
+ private static final Outcome[] OUTCOMES = { ACCEPTED, REJECTED};
private Exchange _exchange;
private TerminusDurability _durability;
@@ -78,50 +80,10 @@ public class ExchangeDestination implements ReceivingDestination, SendingDestina
return null;
}};
- List<? extends BaseQueue> queues = _exchange.route(message, instanceProperties);
+ int enqueues = _exchange.send(message, instanceProperties, txn, null);
- if(queues == null || queues.isEmpty())
- {
- Exchange altExchange = _exchange.getAlternateExchange();
- if(altExchange != null)
- {
- queues = altExchange.route(message, instanceProperties);
- }
- }
-
- if(queues != null && !queues.isEmpty())
- {
- final BaseQueue[] baseQueues = queues.toArray(new BaseQueue[queues.size()]);
-
- txn.enqueue(queues,message, new ServerTransaction.Action()
- {
- MessageReference _reference = message.newReference();
-
- public void postCommit()
- {
- for(int i = 0; i < baseQueues.length; i++)
- {
- try
- {
- baseQueues[i].enqueue(message);
- }
- catch (AMQException e)
- {
- // TODO
- throw new RuntimeException(e);
- }
- }
- _reference.release();
- }
-
- public void onRollback()
- {
- _reference.release();
- }
- });
- }
- return ACCEPTED;
+ return enqueues == 0 ? REJECTED : ACCEPTED;
}
TerminusDurability getDurability()
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java
index 861b225e6f..f92a133919 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java
@@ -624,32 +624,10 @@ public class MessageStoreTest extends QpidTestCase
storedMessage.flushToStore();
final AMQMessage currentMessage = new AMQMessage(storedMessage);
- final List<? extends BaseQueue> destinationQueues = exchange.route(currentMessage, InstanceProperties.EMPTY);
ServerTransaction trans = new AutoCommitTransaction(getVirtualHost().getMessageStore());
-
- trans.enqueue(destinationQueues, currentMessage, new ServerTransaction.Action() {
- public void postCommit()
- {
- try
- {
- for(BaseQueue queue : destinationQueues)
- {
- queue.enqueue(currentMessage);
- }
- }
- catch (AMQException e)
- {
- _logger.error("Problem enqueing message", e);
- }
- }
-
- public void onRollback()
- {
- //To change body of implemented methods use File | Settings | File Templates.
- }
- });
+ exchange.send(currentMessage, InstanceProperties.EMPTY, trans, null);
}