summaryrefslogtreecommitdiff
path: root/qpid/java/broker-plugins
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2014-02-01 15:40:47 +0000
committerRobert Godfrey <rgodfrey@apache.org>2014-02-01 15:40:47 +0000
commit6823d23dbeca328f4e860538a52015bc9313a6db (patch)
tree8af7823f6e4ac169835909ed6babf38d4ad42e57 /qpid/java/broker-plugins
parentade50f17b8ffea099f8fffaaf283b2412f393bce (diff)
downloadqpid-python-6823d23dbeca328f4e860538a52015bc9313a6db.tar.gz
QPID-5504 : Moving routing to Exchange from session classes
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1563431 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker-plugins')
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java18
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java25
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Subscription_0_10.java42
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java170
-rw-r--r--qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java48
5 files changed, 136 insertions, 167 deletions
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
index fe82f65115..bae5616042 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
@@ -46,6 +46,7 @@ import org.apache.qpid.AMQStoreException;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.TransactionTimeoutHelper;
import org.apache.qpid.server.TransactionTimeoutHelper.CloseAction;
+import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.logging.LogActor;
import org.apache.qpid.server.logging.LogMessage;
import org.apache.qpid.server.logging.LogSubject;
@@ -53,6 +54,7 @@ import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.actors.GenericActor;
import org.apache.qpid.server.logging.messages.ChannelMessages;
import org.apache.qpid.server.logging.subjects.ChannelLogSubject;
+import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.protocol.AMQSessionModel;
@@ -102,6 +104,14 @@ public class ServerSession extends Session
private final AtomicBoolean _blocking = new AtomicBoolean(false);
private ChannelLogSubject _logSubject;
private final AtomicInteger _outstandingCredit = new AtomicInteger(UNLIMITED_CREDIT);
+ private final BaseQueue.PostEnqueueAction _checkCapacityAction = new BaseQueue.PostEnqueueAction()
+ {
+ @Override
+ public void onEnqueue(final QueueEntry entry)
+ {
+ entry.getQueue().checkCapacity(ServerSession.this);
+ }
+ };
public static interface MessageDispositionChangeListener
{
@@ -182,7 +192,9 @@ public class ServerSession extends Session
return isCommandsFull(id);
}
- public void enqueue(final MessageTransferMessage message, final List<? extends BaseQueue> queues)
+ public int enqueue(final MessageTransferMessage message,
+ final InstanceProperties instanceProperties,
+ final Exchange exchange)
{
if(_outstandingCredit.get() != UNLIMITED_CREDIT
&& _outstandingCredit.decrementAndGet() == (Integer.MAX_VALUE - PRODUCER_CREDIT_TOPUP_THRESHOLD))
@@ -190,10 +202,10 @@ public class ServerSession extends Session
_outstandingCredit.addAndGet(PRODUCER_CREDIT_TOPUP_THRESHOLD);
invoke(new MessageFlow("",MessageCreditUnit.MESSAGE, PRODUCER_CREDIT_TOPUP_THRESHOLD));
}
+ int enqueues = exchange.send(message, instanceProperties, _transaction, _checkCapacityAction);
getConnectionModel().registerMessageReceived(message.getSize(), message.getArrivalTime());
- PostEnqueueAction postTransactionAction = new PostEnqueueAction(queues, message, isTransactional()) ;
- _transaction.enqueue(queues,message, postTransactionAction);
incrementOutstandingTxnsIfNecessary();
+ return enqueues;
}
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
index 973f706e0a..dcca696529 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
@@ -39,7 +39,6 @@ import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.plugin.ExchangeType;
import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.BaseQueue;
import org.apache.qpid.server.queue.QueueArgumentsConverter;
import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.store.DurableConfigurationStore;
@@ -337,28 +336,10 @@ public class ServerSessionDelegate extends SessionDelegate
}
};
- List<? extends BaseQueue> queues = exchange.route(message, instanceProperties);
- if(queues.isEmpty() && exchange.getAlternateExchange() != null)
- {
- final Exchange alternateExchange = exchange.getAlternateExchange();
- queues = alternateExchange.route(message, instanceProperties);
- if (!queues.isEmpty())
- {
- exchangeInUse = alternateExchange;
- }
- else
- {
- exchangeInUse = exchange;
- }
- }
- else
- {
- exchangeInUse = exchange;
- }
+ int enqueues = serverSession.enqueue(message, instanceProperties, exchange);
- if(!queues.isEmpty())
+ if(enqueues != 0)
{
- serverSession.enqueue(message, queues);
storeMessage.flushToStore();
}
else
@@ -372,7 +353,7 @@ public class ServerSessionDelegate extends SessionDelegate
}
else
{
- serverSession.getLogActor().message(ExchangeMessages.DISCARDMSG(exchangeInUse.getName(), messageMetaData.getRoutingKey()));
+ serverSession.getLogActor().message(ExchangeMessages.DISCARDMSG(exchange.getName(), messageMetaData.getRoutingKey()));
}
}
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Subscription_0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Subscription_0_10.java
index 17d0e5cb64..357b565365 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Subscription_0_10.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Subscription_0_10.java
@@ -59,7 +59,6 @@ import java.text.MessageFormat;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -544,7 +543,7 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr
void reject(final QueueEntry entry)
{
entry.setRedelivered();
- entry.routeToAlternate();
+ entry.routeToAlternate(null, null);
if(entry.isAcquiredBy(this))
{
entry.delete();
@@ -575,35 +574,36 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr
protected void sendToDLQOrDiscard(QueueEntry entry)
{
- final Exchange alternateExchange = entry.getQueue().getAlternateExchange();
final LogActor logActor = CurrentActor.get();
final ServerMessage msg = entry.getMessage();
- if (alternateExchange != null)
+
+ int requeues = entry.routeToAlternate(new BaseQueue.PostEnqueueAction()
+ {
+ @Override
+ public void onEnqueue(final QueueEntry requeueEntry)
+ {
+ logActor.message( ChannelMessages.DEADLETTERMSG(msg.getMessageNumber(),
+ requeueEntry.getQueue().getName()));
+ }
+ }, null);
+
+ if (requeues == 0)
{
- final List<? extends BaseQueue> destinationQueues = alternateExchange.route(entry.getMessage(), entry.getInstanceProperties());
+ final AMQQueue queue = entry.getQueue();
+ final Exchange alternateExchange = queue.getAlternateExchange();
- if (destinationQueues == null || destinationQueues.isEmpty())
+ if(alternateExchange != null)
{
- entry.delete();
-
- logActor.message( ChannelMessages.DISCARDMSG_NOROUTE(msg.getMessageNumber(), alternateExchange.getName()));
+ logActor.message( ChannelMessages.DISCARDMSG_NOROUTE(msg.getMessageNumber(),
+ alternateExchange.getName()));
}
else
{
- entry.routeToAlternate();
-
- //output operational logging for each delivery post commit
- for (final BaseQueue destinationQueue : destinationQueues)
- {
- logActor.message( ChannelMessages.DEADLETTERMSG(msg.getMessageNumber(), destinationQueue.getName()));
- }
+ logActor.message(ChannelMessages.DISCARDMSG_NOALTEXCH(msg.getMessageNumber(),
+ queue.getName(),
+ msg.getRoutingKey()));
}
}
- else
- {
- entry.delete();
- logActor.message(ChannelMessages.DISCARDMSG_NOALTEXCH(msg.getMessageNumber(), entry.getQueue().getName(), msg.getRoutingKey()));
- }
}
private boolean isMaxDeliveryLimitReached(QueueEntry entry)
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
index b7dc105cb7..c6d4151628 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
@@ -165,6 +165,11 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
private final TransactionTimeoutHelper _transactionTimeoutHelper;
private final UUID _id = UUID.randomUUID();
+
+ private final CapacityCheckAction _capacityCheckAction = new CapacityCheckAction();
+ private final ImmediateAction _immediateAction = new ImmediateAction();
+
+
public AMQChannel(AMQProtocolSession session, int channelId, MessageStore messageStore)
throws AMQException
{
@@ -330,6 +335,8 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
}
else
{
+ final boolean immediate = _currentMessage.getMessagePublishInfo().isImmediate();
+
final InstanceProperties instanceProperties =
new InstanceProperties()
{
@@ -341,7 +348,7 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
case EXPIRATION:
return amqMessage.getExpiration();
case IMMEDIATE:
- return _currentMessage.getMessagePublishInfo().isImmediate();
+ return immediate;
case PERSISTENT:
return amqMessage.isPersistent();
case MANDATORY:
@@ -353,21 +360,16 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
}
};
- final List<? extends BaseQueue> destinationQueues =
- _currentMessage.getExchange().route(amqMessage, instanceProperties);
-
- if(destinationQueues == null || destinationQueues.isEmpty())
+ int enqueues = _currentMessage.getExchange().send(amqMessage, instanceProperties, _transaction,
+ immediate ? _immediateAction : _capacityCheckAction);
+ if(enqueues == 0)
{
handleUnroutableMessage(amqMessage);
}
else
{
- _transaction.enqueue(destinationQueues,
- amqMessage,
- new MessageDeliveryAction(amqMessage, destinationQueues));
incrementOutstandingTxnsIfNecessary();
handle.flushToStore();
-
}
}
}
@@ -1258,7 +1260,7 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
if(immediate)
{
- action = new ImmediateAction(queue);
+ action = new ImmediateAction();
}
else
{
@@ -1291,58 +1293,72 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
_reference.release();
}
- private class ImmediateAction implements BaseQueue.PostEnqueueAction
+
+ }
+ private class ImmediateAction implements BaseQueue.PostEnqueueAction
+ {
+
+ public ImmediateAction()
{
- private final BaseQueue _queue;
+ }
- public ImmediateAction(BaseQueue queue)
- {
- _queue = queue;
- }
+ public void onEnqueue(QueueEntry entry)
+ {
+ AMQQueue queue = entry.getQueue();
- public void onEnqueue(QueueEntry entry)
+ if (!entry.getDeliveredToConsumer() && entry.acquire())
{
- if (!entry.getDeliveredToConsumer() && entry.acquire())
- {
-
- ServerTransaction txn = new LocalTransaction(_messageStore);
- Collection<QueueEntry> entries = new ArrayList<QueueEntry>(1);
- entries.add(entry);
- final AMQMessage message = (AMQMessage) entry.getMessage();
- txn.dequeue(_queue, entry.getMessage(),
- new MessageAcknowledgeAction(entries)
+ ServerTransaction txn = new LocalTransaction(_messageStore);
+ Collection<QueueEntry> entries = new ArrayList<QueueEntry>(1);
+ entries.add(entry);
+ final AMQMessage message = (AMQMessage) entry.getMessage();
+ txn.dequeue(queue, entry.getMessage(),
+ new MessageAcknowledgeAction(entries)
+ {
+ @Override
+ public void postCommit()
{
- @Override
- public void postCommit()
+ try
{
- try
- {
- final
- ProtocolOutputConverter outputConverter =
- _session.getProtocolOutputConverter();
-
- outputConverter.writeReturn(message.getMessagePublishInfo(),
- message.getContentHeaderBody(),
- message,
- _channelId,
- AMQConstant.NO_CONSUMERS.getCode(),
- IMMEDIATE_DELIVERY_REPLY_TEXT);
- }
- catch (AMQException e)
- {
- throw new RuntimeException(e);
- }
- super.postCommit();
+ final
+ ProtocolOutputConverter outputConverter =
+ _session.getProtocolOutputConverter();
+
+ outputConverter.writeReturn(message.getMessagePublishInfo(),
+ message.getContentHeaderBody(),
+ message,
+ _channelId,
+ AMQConstant.NO_CONSUMERS.getCode(),
+ IMMEDIATE_DELIVERY_REPLY_TEXT);
+ }
+ catch (AMQException e)
+ {
+ throw new RuntimeException(e);
}
+ super.postCommit();
}
- );
- txn.commit();
-
+ }
+ );
+ txn.commit();
- }
}
+ else
+ {
+ queue.checkCapacity(AMQChannel.this);
+ }
+
+ }
+ }
+
+ private final class CapacityCheckAction implements BaseQueue.PostEnqueueAction
+ {
+ @Override
+ public void onEnqueue(final QueueEntry entry)
+ {
+ AMQQueue queue = entry.getQueue();
+ queue.checkCapacity(AMQChannel.this);
}
}
@@ -1550,48 +1566,46 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
public void deadLetter(long deliveryTag) throws AMQException
{
final UnacknowledgedMessageMap unackedMap = getUnacknowledgedMessageMap();
- final QueueEntry rejectedQueueEntry = unackedMap.get(deliveryTag);
+ final QueueEntry rejectedQueueEntry = unackedMap.remove(deliveryTag);
if (rejectedQueueEntry == null)
{
_logger.warn("No message found, unable to DLQ delivery tag: " + deliveryTag);
- return;
}
else
{
final ServerMessage msg = rejectedQueueEntry.getMessage();
- final AMQQueue queue = rejectedQueueEntry.getQueue();
-
- final Exchange altExchange = queue.getAlternateExchange();
- unackedMap.remove(deliveryTag);
+ int requeues = rejectedQueueEntry.routeToAlternate(new BaseQueue.PostEnqueueAction()
+ {
+ @Override
+ public void onEnqueue(final QueueEntry requeueEntry)
+ {
+ _actor.message( _logSubject, ChannelMessages.DEADLETTERMSG(msg.getMessageNumber(),
+ requeueEntry.getQueue().getName()));
+ }
+ }, null);
- if (altExchange == null)
+ if(requeues == 0)
{
- _logger.debug("No alternate exchange configured for queue, must discard the message as unable to DLQ: delivery tag: " + deliveryTag);
- _actor.message(_logSubject, ChannelMessages.DISCARDMSG_NOALTEXCH(msg.getMessageNumber(), queue.getName(), msg.getRoutingKey()));
- rejectedQueueEntry.delete();
- return;
- }
+ final AMQQueue queue = rejectedQueueEntry.getQueue();
+ final Exchange altExchange = queue.getAlternateExchange();
- final List<? extends BaseQueue> destinationQueues =
- altExchange.route(rejectedQueueEntry.getMessage(), rejectedQueueEntry.getInstanceProperties());
-
- if (destinationQueues == null || destinationQueues.isEmpty())
- {
- _logger.debug("Routing process provided no queues to enqueue the message on, must discard message as unable to DLQ: delivery tag: " + deliveryTag);
- _actor.message(_logSubject, ChannelMessages.DISCARDMSG_NOROUTE(msg.getMessageNumber(), altExchange.getName()));
- rejectedQueueEntry.delete();
- return;
- }
-
- rejectedQueueEntry.routeToAlternate();
+ if (altExchange == null)
+ {
+ _logger.debug("No alternate exchange configured for queue, must discard the message as unable to DLQ: delivery tag: " + deliveryTag);
+ _actor.message(_logSubject, ChannelMessages.DISCARDMSG_NOALTEXCH(msg.getMessageNumber(), queue.getName(), msg.getRoutingKey()));
- //output operational logging for each delivery post commit
- for (final BaseQueue destinationQueue : destinationQueues)
- {
- _actor.message(_logSubject, ChannelMessages.DEADLETTERMSG(msg.getMessageNumber(), destinationQueue.getName()));
+ }
+ else
+ {
+ _logger.debug(
+ "Routing process provided no queues to enqueue the message on, must discard message as unable to DLQ: delivery tag: "
+ + deliveryTag);
+ _actor.message(_logSubject,
+ ChannelMessages.DISCARDMSG_NOROUTE(msg.getMessageNumber(), altExchange.getName()));
+ }
}
}
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java
index 3b981b46b8..3d030890e0 100644
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java
@@ -24,6 +24,7 @@ import java.util.List;
import org.apache.qpid.AMQException;
import org.apache.qpid.amqp_1_0.type.Outcome;
import org.apache.qpid.amqp_1_0.type.messaging.Accepted;
+import org.apache.qpid.amqp_1_0.type.messaging.Rejected;
import org.apache.qpid.amqp_1_0.type.messaging.TerminusDurability;
import org.apache.qpid.amqp_1_0.type.messaging.TerminusExpiryPolicy;
import org.apache.qpid.server.exchange.Exchange;
@@ -35,7 +36,8 @@ import org.apache.qpid.server.txn.ServerTransaction;
public class ExchangeDestination implements ReceivingDestination, SendingDestination
{
private static final Accepted ACCEPTED = new Accepted();
- private static final Outcome[] OUTCOMES = { ACCEPTED };
+ public static final Rejected REJECTED = new Rejected();
+ private static final Outcome[] OUTCOMES = { ACCEPTED, REJECTED};
private Exchange _exchange;
private TerminusDurability _durability;
@@ -78,50 +80,10 @@ public class ExchangeDestination implements ReceivingDestination, SendingDestina
return null;
}};
- List<? extends BaseQueue> queues = _exchange.route(message, instanceProperties);
+ int enqueues = _exchange.send(message, instanceProperties, txn, null);
- if(queues == null || queues.isEmpty())
- {
- Exchange altExchange = _exchange.getAlternateExchange();
- if(altExchange != null)
- {
- queues = altExchange.route(message, instanceProperties);
- }
- }
-
- if(queues != null && !queues.isEmpty())
- {
- final BaseQueue[] baseQueues = queues.toArray(new BaseQueue[queues.size()]);
-
- txn.enqueue(queues,message, new ServerTransaction.Action()
- {
- MessageReference _reference = message.newReference();
-
- public void postCommit()
- {
- for(int i = 0; i < baseQueues.length; i++)
- {
- try
- {
- baseQueues[i].enqueue(message);
- }
- catch (AMQException e)
- {
- // TODO
- throw new RuntimeException(e);
- }
- }
- _reference.release();
- }
-
- public void onRollback()
- {
- _reference.release();
- }
- });
- }
- return ACCEPTED;
+ return enqueues == 0 ? REJECTED : ACCEPTED;
}
TerminusDurability getDurability()