diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2014-02-01 15:40:47 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2014-02-01 15:40:47 +0000 |
| commit | 6823d23dbeca328f4e860538a52015bc9313a6db (patch) | |
| tree | 8af7823f6e4ac169835909ed6babf38d4ad42e57 /qpid/java/broker-plugins | |
| parent | ade50f17b8ffea099f8fffaaf283b2412f393bce (diff) | |
| download | qpid-python-6823d23dbeca328f4e860538a52015bc9313a6db.tar.gz | |
QPID-5504 : Moving routing to Exchange from session classes
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1563431 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker-plugins')
5 files changed, 136 insertions, 167 deletions
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java index fe82f65115..bae5616042 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java @@ -46,6 +46,7 @@ import org.apache.qpid.AMQStoreException; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.TransactionTimeoutHelper; import org.apache.qpid.server.TransactionTimeoutHelper.CloseAction; +import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.logging.LogActor; import org.apache.qpid.server.logging.LogMessage; import org.apache.qpid.server.logging.LogSubject; @@ -53,6 +54,7 @@ import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.logging.actors.GenericActor; import org.apache.qpid.server.logging.messages.ChannelMessages; import org.apache.qpid.server.logging.subjects.ChannelLogSubject; +import org.apache.qpid.server.message.InstanceProperties; import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.protocol.AMQConnectionModel; import org.apache.qpid.server.protocol.AMQSessionModel; @@ -102,6 +104,14 @@ public class ServerSession extends Session private final AtomicBoolean _blocking = new AtomicBoolean(false); private ChannelLogSubject _logSubject; private final AtomicInteger _outstandingCredit = new AtomicInteger(UNLIMITED_CREDIT); + private final BaseQueue.PostEnqueueAction _checkCapacityAction = new BaseQueue.PostEnqueueAction() + { + @Override + public void onEnqueue(final QueueEntry entry) + { + entry.getQueue().checkCapacity(ServerSession.this); + } + }; public static interface MessageDispositionChangeListener { @@ -182,7 +192,9 @@ public class ServerSession extends Session return isCommandsFull(id); } - public void enqueue(final MessageTransferMessage message, final List<? extends BaseQueue> queues) + public int enqueue(final MessageTransferMessage message, + final InstanceProperties instanceProperties, + final Exchange exchange) { if(_outstandingCredit.get() != UNLIMITED_CREDIT && _outstandingCredit.decrementAndGet() == (Integer.MAX_VALUE - PRODUCER_CREDIT_TOPUP_THRESHOLD)) @@ -190,10 +202,10 @@ public class ServerSession extends Session _outstandingCredit.addAndGet(PRODUCER_CREDIT_TOPUP_THRESHOLD); invoke(new MessageFlow("",MessageCreditUnit.MESSAGE, PRODUCER_CREDIT_TOPUP_THRESHOLD)); } + int enqueues = exchange.send(message, instanceProperties, _transaction, _checkCapacityAction); getConnectionModel().registerMessageReceived(message.getSize(), message.getArrivalTime()); - PostEnqueueAction postTransactionAction = new PostEnqueueAction(queues, message, isTransactional()) ; - _transaction.enqueue(queues,message, postTransactionAction); incrementOutstandingTxnsIfNecessary(); + return enqueues; } diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java index 973f706e0a..dcca696529 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java @@ -39,7 +39,6 @@ import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.plugin.ExchangeType; import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.queue.BaseQueue; import org.apache.qpid.server.queue.QueueArgumentsConverter; import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.store.DurableConfigurationStore; @@ -337,28 +336,10 @@ public class ServerSessionDelegate extends SessionDelegate } }; - List<? extends BaseQueue> queues = exchange.route(message, instanceProperties); - if(queues.isEmpty() && exchange.getAlternateExchange() != null) - { - final Exchange alternateExchange = exchange.getAlternateExchange(); - queues = alternateExchange.route(message, instanceProperties); - if (!queues.isEmpty()) - { - exchangeInUse = alternateExchange; - } - else - { - exchangeInUse = exchange; - } - } - else - { - exchangeInUse = exchange; - } + int enqueues = serverSession.enqueue(message, instanceProperties, exchange); - if(!queues.isEmpty()) + if(enqueues != 0) { - serverSession.enqueue(message, queues); storeMessage.flushToStore(); } else @@ -372,7 +353,7 @@ public class ServerSessionDelegate extends SessionDelegate } else { - serverSession.getLogActor().message(ExchangeMessages.DISCARDMSG(exchangeInUse.getName(), messageMetaData.getRoutingKey())); + serverSession.getLogActor().message(ExchangeMessages.DISCARDMSG(exchange.getName(), messageMetaData.getRoutingKey())); } } diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Subscription_0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Subscription_0_10.java index 17d0e5cb64..357b565365 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Subscription_0_10.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Subscription_0_10.java @@ -59,7 +59,6 @@ import java.text.MessageFormat; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; -import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; @@ -544,7 +543,7 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr void reject(final QueueEntry entry) { entry.setRedelivered(); - entry.routeToAlternate(); + entry.routeToAlternate(null, null); if(entry.isAcquiredBy(this)) { entry.delete(); @@ -575,35 +574,36 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr protected void sendToDLQOrDiscard(QueueEntry entry) { - final Exchange alternateExchange = entry.getQueue().getAlternateExchange(); final LogActor logActor = CurrentActor.get(); final ServerMessage msg = entry.getMessage(); - if (alternateExchange != null) + + int requeues = entry.routeToAlternate(new BaseQueue.PostEnqueueAction() + { + @Override + public void onEnqueue(final QueueEntry requeueEntry) + { + logActor.message( ChannelMessages.DEADLETTERMSG(msg.getMessageNumber(), + requeueEntry.getQueue().getName())); + } + }, null); + + if (requeues == 0) { - final List<? extends BaseQueue> destinationQueues = alternateExchange.route(entry.getMessage(), entry.getInstanceProperties()); + final AMQQueue queue = entry.getQueue(); + final Exchange alternateExchange = queue.getAlternateExchange(); - if (destinationQueues == null || destinationQueues.isEmpty()) + if(alternateExchange != null) { - entry.delete(); - - logActor.message( ChannelMessages.DISCARDMSG_NOROUTE(msg.getMessageNumber(), alternateExchange.getName())); + logActor.message( ChannelMessages.DISCARDMSG_NOROUTE(msg.getMessageNumber(), + alternateExchange.getName())); } else { - entry.routeToAlternate(); - - //output operational logging for each delivery post commit - for (final BaseQueue destinationQueue : destinationQueues) - { - logActor.message( ChannelMessages.DEADLETTERMSG(msg.getMessageNumber(), destinationQueue.getName())); - } + logActor.message(ChannelMessages.DISCARDMSG_NOALTEXCH(msg.getMessageNumber(), + queue.getName(), + msg.getRoutingKey())); } } - else - { - entry.delete(); - logActor.message(ChannelMessages.DISCARDMSG_NOALTEXCH(msg.getMessageNumber(), entry.getQueue().getName(), msg.getRoutingKey())); - } } private boolean isMaxDeliveryLimitReached(QueueEntry entry) diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java index b7dc105cb7..c6d4151628 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java @@ -165,6 +165,11 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F private final TransactionTimeoutHelper _transactionTimeoutHelper; private final UUID _id = UUID.randomUUID(); + + private final CapacityCheckAction _capacityCheckAction = new CapacityCheckAction(); + private final ImmediateAction _immediateAction = new ImmediateAction(); + + public AMQChannel(AMQProtocolSession session, int channelId, MessageStore messageStore) throws AMQException { @@ -330,6 +335,8 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F } else { + final boolean immediate = _currentMessage.getMessagePublishInfo().isImmediate(); + final InstanceProperties instanceProperties = new InstanceProperties() { @@ -341,7 +348,7 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F case EXPIRATION: return amqMessage.getExpiration(); case IMMEDIATE: - return _currentMessage.getMessagePublishInfo().isImmediate(); + return immediate; case PERSISTENT: return amqMessage.isPersistent(); case MANDATORY: @@ -353,21 +360,16 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F } }; - final List<? extends BaseQueue> destinationQueues = - _currentMessage.getExchange().route(amqMessage, instanceProperties); - - if(destinationQueues == null || destinationQueues.isEmpty()) + int enqueues = _currentMessage.getExchange().send(amqMessage, instanceProperties, _transaction, + immediate ? _immediateAction : _capacityCheckAction); + if(enqueues == 0) { handleUnroutableMessage(amqMessage); } else { - _transaction.enqueue(destinationQueues, - amqMessage, - new MessageDeliveryAction(amqMessage, destinationQueues)); incrementOutstandingTxnsIfNecessary(); handle.flushToStore(); - } } } @@ -1258,7 +1260,7 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F if(immediate) { - action = new ImmediateAction(queue); + action = new ImmediateAction(); } else { @@ -1291,58 +1293,72 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F _reference.release(); } - private class ImmediateAction implements BaseQueue.PostEnqueueAction + + } + private class ImmediateAction implements BaseQueue.PostEnqueueAction + { + + public ImmediateAction() { - private final BaseQueue _queue; + } - public ImmediateAction(BaseQueue queue) - { - _queue = queue; - } + public void onEnqueue(QueueEntry entry) + { + AMQQueue queue = entry.getQueue(); - public void onEnqueue(QueueEntry entry) + if (!entry.getDeliveredToConsumer() && entry.acquire()) { - if (!entry.getDeliveredToConsumer() && entry.acquire()) - { - - ServerTransaction txn = new LocalTransaction(_messageStore); - Collection<QueueEntry> entries = new ArrayList<QueueEntry>(1); - entries.add(entry); - final AMQMessage message = (AMQMessage) entry.getMessage(); - txn.dequeue(_queue, entry.getMessage(), - new MessageAcknowledgeAction(entries) + ServerTransaction txn = new LocalTransaction(_messageStore); + Collection<QueueEntry> entries = new ArrayList<QueueEntry>(1); + entries.add(entry); + final AMQMessage message = (AMQMessage) entry.getMessage(); + txn.dequeue(queue, entry.getMessage(), + new MessageAcknowledgeAction(entries) + { + @Override + public void postCommit() { - @Override - public void postCommit() + try { - try - { - final - ProtocolOutputConverter outputConverter = - _session.getProtocolOutputConverter(); - - outputConverter.writeReturn(message.getMessagePublishInfo(), - message.getContentHeaderBody(), - message, - _channelId, - AMQConstant.NO_CONSUMERS.getCode(), - IMMEDIATE_DELIVERY_REPLY_TEXT); - } - catch (AMQException e) - { - throw new RuntimeException(e); - } - super.postCommit(); + final + ProtocolOutputConverter outputConverter = + _session.getProtocolOutputConverter(); + + outputConverter.writeReturn(message.getMessagePublishInfo(), + message.getContentHeaderBody(), + message, + _channelId, + AMQConstant.NO_CONSUMERS.getCode(), + IMMEDIATE_DELIVERY_REPLY_TEXT); + } + catch (AMQException e) + { + throw new RuntimeException(e); } + super.postCommit(); } - ); - txn.commit(); - + } + ); + txn.commit(); - } } + else + { + queue.checkCapacity(AMQChannel.this); + } + + } + } + + private final class CapacityCheckAction implements BaseQueue.PostEnqueueAction + { + @Override + public void onEnqueue(final QueueEntry entry) + { + AMQQueue queue = entry.getQueue(); + queue.checkCapacity(AMQChannel.this); } } @@ -1550,48 +1566,46 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F public void deadLetter(long deliveryTag) throws AMQException { final UnacknowledgedMessageMap unackedMap = getUnacknowledgedMessageMap(); - final QueueEntry rejectedQueueEntry = unackedMap.get(deliveryTag); + final QueueEntry rejectedQueueEntry = unackedMap.remove(deliveryTag); if (rejectedQueueEntry == null) { _logger.warn("No message found, unable to DLQ delivery tag: " + deliveryTag); - return; } else { final ServerMessage msg = rejectedQueueEntry.getMessage(); - final AMQQueue queue = rejectedQueueEntry.getQueue(); - - final Exchange altExchange = queue.getAlternateExchange(); - unackedMap.remove(deliveryTag); + int requeues = rejectedQueueEntry.routeToAlternate(new BaseQueue.PostEnqueueAction() + { + @Override + public void onEnqueue(final QueueEntry requeueEntry) + { + _actor.message( _logSubject, ChannelMessages.DEADLETTERMSG(msg.getMessageNumber(), + requeueEntry.getQueue().getName())); + } + }, null); - if (altExchange == null) + if(requeues == 0) { - _logger.debug("No alternate exchange configured for queue, must discard the message as unable to DLQ: delivery tag: " + deliveryTag); - _actor.message(_logSubject, ChannelMessages.DISCARDMSG_NOALTEXCH(msg.getMessageNumber(), queue.getName(), msg.getRoutingKey())); - rejectedQueueEntry.delete(); - return; - } + final AMQQueue queue = rejectedQueueEntry.getQueue(); + final Exchange altExchange = queue.getAlternateExchange(); - final List<? extends BaseQueue> destinationQueues = - altExchange.route(rejectedQueueEntry.getMessage(), rejectedQueueEntry.getInstanceProperties()); - - if (destinationQueues == null || destinationQueues.isEmpty()) - { - _logger.debug("Routing process provided no queues to enqueue the message on, must discard message as unable to DLQ: delivery tag: " + deliveryTag); - _actor.message(_logSubject, ChannelMessages.DISCARDMSG_NOROUTE(msg.getMessageNumber(), altExchange.getName())); - rejectedQueueEntry.delete(); - return; - } - - rejectedQueueEntry.routeToAlternate(); + if (altExchange == null) + { + _logger.debug("No alternate exchange configured for queue, must discard the message as unable to DLQ: delivery tag: " + deliveryTag); + _actor.message(_logSubject, ChannelMessages.DISCARDMSG_NOALTEXCH(msg.getMessageNumber(), queue.getName(), msg.getRoutingKey())); - //output operational logging for each delivery post commit - for (final BaseQueue destinationQueue : destinationQueues) - { - _actor.message(_logSubject, ChannelMessages.DEADLETTERMSG(msg.getMessageNumber(), destinationQueue.getName())); + } + else + { + _logger.debug( + "Routing process provided no queues to enqueue the message on, must discard message as unable to DLQ: delivery tag: " + + deliveryTag); + _actor.message(_logSubject, + ChannelMessages.DISCARDMSG_NOROUTE(msg.getMessageNumber(), altExchange.getName())); + } } } diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java index 3b981b46b8..3d030890e0 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java @@ -24,6 +24,7 @@ import java.util.List; import org.apache.qpid.AMQException; import org.apache.qpid.amqp_1_0.type.Outcome; import org.apache.qpid.amqp_1_0.type.messaging.Accepted; +import org.apache.qpid.amqp_1_0.type.messaging.Rejected; import org.apache.qpid.amqp_1_0.type.messaging.TerminusDurability; import org.apache.qpid.amqp_1_0.type.messaging.TerminusExpiryPolicy; import org.apache.qpid.server.exchange.Exchange; @@ -35,7 +36,8 @@ import org.apache.qpid.server.txn.ServerTransaction; public class ExchangeDestination implements ReceivingDestination, SendingDestination { private static final Accepted ACCEPTED = new Accepted(); - private static final Outcome[] OUTCOMES = { ACCEPTED }; + public static final Rejected REJECTED = new Rejected(); + private static final Outcome[] OUTCOMES = { ACCEPTED, REJECTED}; private Exchange _exchange; private TerminusDurability _durability; @@ -78,50 +80,10 @@ public class ExchangeDestination implements ReceivingDestination, SendingDestina return null; }}; - List<? extends BaseQueue> queues = _exchange.route(message, instanceProperties); + int enqueues = _exchange.send(message, instanceProperties, txn, null); - if(queues == null || queues.isEmpty()) - { - Exchange altExchange = _exchange.getAlternateExchange(); - if(altExchange != null) - { - queues = altExchange.route(message, instanceProperties); - } - } - - if(queues != null && !queues.isEmpty()) - { - final BaseQueue[] baseQueues = queues.toArray(new BaseQueue[queues.size()]); - - txn.enqueue(queues,message, new ServerTransaction.Action() - { - MessageReference _reference = message.newReference(); - - public void postCommit() - { - for(int i = 0; i < baseQueues.length; i++) - { - try - { - baseQueues[i].enqueue(message); - } - catch (AMQException e) - { - // TODO - throw new RuntimeException(e); - } - } - _reference.release(); - } - - public void onRollback() - { - _reference.release(); - } - }); - } - return ACCEPTED; + return enqueues == 0 ? REJECTED : ACCEPTED; } TerminusDurability getDurability() |
