diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2014-01-22 21:43:46 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2014-01-22 21:43:46 +0000 |
| commit | 04560e1d3c28fb21a8f8ae094a62318790474e61 (patch) | |
| tree | 32bef19f14c57e5722f29b3338a40b5dce145aba /qpid/java/broker-plugins | |
| parent | 8a8da44b127644ac9cfb43eef88d3f31a35f9aca (diff) | |
| download | qpid-python-04560e1d3c28fb21a8f8ae094a62318790474e61.tar.gz | |
QPID-5504 : remove InboundMessage... characterize routing as being on the immutable message and a set of instance properties
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1560524 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker-plugins')
12 files changed, 104 insertions, 51 deletions
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java index a471e53fc6..2e74621814 100755 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java @@ -21,7 +21,6 @@ package org.apache.qpid.server.protocol.v0_10; import org.apache.qpid.server.message.AMQMessageHeader; -import org.apache.qpid.server.message.InboundMessage; import org.apache.qpid.server.plugin.MessageMetaDataType; import org.apache.qpid.server.store.StorableMessageMetaData; import org.apache.qpid.transport.DeliveryProperties; diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java index e01fb474ac..487862bcba 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java @@ -22,15 +22,13 @@ package org.apache.qpid.server.protocol.v0_10; import org.apache.qpid.server.message.AMQMessageHeader; import org.apache.qpid.server.message.AbstractServerMessageImpl; -import org.apache.qpid.server.message.InboundMessage; -import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.store.StoredMessage; import org.apache.qpid.transport.Header; import java.nio.ByteBuffer; -public class MessageTransferMessage extends AbstractServerMessageImpl<MessageTransferMessage, MessageMetaData_0_10> implements InboundMessage +public class MessageTransferMessage extends AbstractServerMessageImpl<MessageTransferMessage, MessageMetaData_0_10> { public MessageTransferMessage(StoredMessage<MessageMetaData_0_10> storeMessage, Object connectionRef) diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java index f98eb09b43..261c937836 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java @@ -53,9 +53,7 @@ import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.logging.actors.GenericActor; import org.apache.qpid.server.logging.messages.ChannelMessages; import org.apache.qpid.server.logging.subjects.ChannelLogSubject; -import org.apache.qpid.server.message.InboundMessage; import org.apache.qpid.server.message.MessageReference; -import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.protocol.AMQConnectionModel; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.queue.AMQQueue; @@ -766,12 +764,12 @@ public class ServerSession extends Session } } - public boolean onSameConnection(InboundMessage inbound) + @Override + public Object getConnectionReference() { - return inbound.getConnectionReference() == getConnection().getReference(); + return getConnection().getReference(); } - public String toLogString() { long connectionId = super.getConnection() instanceof ServerConnection diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java index 182e71c957..8756beb690 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java @@ -33,7 +33,8 @@ import org.apache.qpid.server.exchange.HeadersExchange; import org.apache.qpid.server.filter.FilterManager; import org.apache.qpid.server.filter.FilterManagerFactory; import org.apache.qpid.server.logging.messages.ExchangeMessages; -import org.apache.qpid.server.message.AbstractServerMessageImpl;import org.apache.qpid.server.message.MessageReference; +import org.apache.qpid.server.message.InstanceProperties; +import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.plugin.ExchangeType; @@ -290,9 +291,8 @@ public class ServerSessionDelegate extends SessionDelegate { final Exchange exchange = getExchangeForMessage(ssn, xfr); - DeliveryProperties delvProps = null; - if(xfr.getHeader() != null && (delvProps = xfr.getHeader().getDeliveryProperties()) != null && delvProps.hasTtl() && !delvProps - .hasExpiration()) + final DeliveryProperties delvProps = xfr.getHeader() == null ? null : xfr.getHeader().getDeliveryProperties(); + if(delvProps != null && delvProps.hasTtl() && !delvProps.hasExpiration()) { delvProps.setExpiration(System.currentTimeMillis() + delvProps.getTtl()); } @@ -312,13 +312,36 @@ public class ServerSessionDelegate extends SessionDelegate final MessageStore store = getVirtualHost(ssn).getMessageStore(); final StoredMessage<MessageMetaData_0_10> storeMessage = createStoreMessage(xfr, messageMetaData, store); final ServerSession serverSession = (ServerSession) ssn; - MessageTransferMessage message = new MessageTransferMessage(storeMessage, serverSession.getReference()); + final MessageTransferMessage message = new MessageTransferMessage(storeMessage, serverSession.getReference()); MessageReference<MessageTransferMessage> reference = message.newReference(); - List<? extends BaseQueue> queues = exchange.route(message); + + final InstanceProperties instanceProperties = new InstanceProperties() + { + @Override + public Object getProperty(final Property prop) + { + switch(prop) + { + case EXPIRATION: + return message.getExpiration(); + case IMMEDIATE: + return message.isImmediate(); + case MANDATORY: + return (delvProps == null || !delvProps.getDiscardUnroutable()) && xfr.getAcceptMode() == MessageAcceptMode.EXPLICIT; + case PERSISTENT: + return message.isPersistent(); + case REDELIVERED: + return delvProps.getRedelivered(); + } + return null; + } + }; + + List<? extends BaseQueue> queues = exchange.route(message, instanceProperties); if(queues.isEmpty() && exchange.getAlternateExchange() != null) { final Exchange alternateExchange = exchange.getAlternateExchange(); - queues = alternateExchange.route(message); + queues = alternateExchange.route(message, instanceProperties); if (!queues.isEmpty()) { exchangeInUse = alternateExchange; diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Subscription_0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Subscription_0_10.java index 77b63906cc..f68973096a 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Subscription_0_10.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Subscription_0_10.java @@ -33,13 +33,11 @@ import org.apache.qpid.server.logging.messages.SubscriptionMessages; import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.plugin.MessageConverter; import org.apache.qpid.server.protocol.MessageConverterRegistry; -import org.apache.qpid.server.message.InboundMessage; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.BaseQueue; -import org.apache.qpid.server.queue.InboundMessageAdapter; -import org.apache.qpid.server.queue.QueueArgumentsConverter; import org.apache.qpid.server.queue.QueueEntry; +import org.apache.qpid.server.queue.QueueEntryInstanceProperties; import org.apache.qpid.server.subscription.Subscription; import org.apache.qpid.server.txn.AutoCommitTransaction; import org.apache.qpid.server.txn.ServerTransaction; @@ -230,7 +228,7 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr private boolean checkFilters(QueueEntry entry) { - return (_filters == null) || _filters.allAllow(entry); + return (_filters == null) || _filters.allAllow(entry.asFilterable()); } public boolean isClosed() @@ -583,9 +581,7 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr final ServerMessage msg = entry.getMessage(); if (alternateExchange != null) { - final InboundMessage m = new InboundMessageAdapter(entry); - - final List<? extends BaseQueue> destinationQueues = alternateExchange.route(m); + final List<? extends BaseQueue> destinationQueues = alternateExchange.route(entry.getMessage(), new QueueEntryInstanceProperties(entry)); if (destinationQueues == null || destinationQueues.isEmpty()) { diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java index c6e0dfc3e2..bb1d1949a2 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java @@ -65,7 +65,7 @@ import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.logging.messages.ChannelMessages; import org.apache.qpid.server.logging.messages.ExchangeMessages; import org.apache.qpid.server.logging.subjects.ChannelLogSubject; -import org.apache.qpid.server.message.InboundMessage; +import org.apache.qpid.server.message.InstanceProperties; import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter; @@ -73,8 +73,8 @@ import org.apache.qpid.server.protocol.AMQConnectionModel; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.BaseQueue; -import org.apache.qpid.server.queue.InboundMessageAdapter; import org.apache.qpid.server.queue.QueueEntry; +import org.apache.qpid.server.queue.QueueEntryInstanceProperties; import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.StoreFuture; @@ -331,7 +331,31 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F } else { - final List<? extends BaseQueue> destinationQueues = _currentMessage.getExchange().route(amqMessage); + final InstanceProperties instanceProperties = + new InstanceProperties() + { + @Override + public Object getProperty(final Property prop) + { + switch(prop) + { + case EXPIRATION: + return amqMessage.getExpiration(); + case IMMEDIATE: + return _currentMessage.getMessagePublishInfo().isImmediate(); + case PERSISTENT: + return amqMessage.isPersistent(); + case MANDATORY: + return _currentMessage.getMessagePublishInfo().isMandatory(); + case REDELIVERED: + return false; + } + return null; + } + }; + + final List<? extends BaseQueue> destinationQueues = + _currentMessage.getExchange().route(amqMessage, instanceProperties); if(destinationQueues == null || destinationQueues.isEmpty()) { @@ -1472,9 +1496,10 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F } } - public boolean onSameConnection(InboundMessage inbound) + @Override + public Object getConnectionReference() { - return getProtocolSession().getReference() == inbound.getConnectionReference(); + return getProtocolSession().getReference(); } public int getUnacknowledgedMessageCount() @@ -1550,9 +1575,9 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F return; } - final InboundMessage m = new InboundMessageAdapter(rejectedQueueEntry); - final List<? extends BaseQueue> destinationQueues = altExchange.route(m); + final List<? extends BaseQueue> destinationQueues = + altExchange.route(rejectedQueueEntry.getMessage(), new QueueEntryInstanceProperties(rejectedQueueEntry)); if (destinationQueues == null || destinationQueues.isEmpty()) { diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java index b73b6bc0aa..833f5fb06f 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java @@ -22,15 +22,11 @@ package org.apache.qpid.server.protocol.v0_8; import org.apache.log4j.Logger; -import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.server.message.AMQMessageHeader; import org.apache.qpid.server.message.AbstractServerMessageImpl; -import org.apache.qpid.server.message.InboundMessage; -import org.apache.qpid.server.message.MessageReference; -import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.store.StoredMessage; import java.nio.ByteBuffer; @@ -38,7 +34,7 @@ import java.nio.ByteBuffer; /** * A deliverable message. */ -public class AMQMessage extends AbstractServerMessageImpl<AMQMessage, MessageMetaData> implements InboundMessage +public class AMQMessage extends AbstractServerMessageImpl<AMQMessage, MessageMetaData> { /** Used for debugging purposes. */ private static final Logger _log = Logger.getLogger(AMQMessage.class); @@ -94,12 +90,6 @@ public class AMQMessage extends AbstractServerMessageImpl<AMQMessage, MessageMet return getMessageMetaData().getMessageHeader(); } - @Override - public boolean isRedelivered() - { - return false; - } - public MessagePublishInfo getMessagePublishInfo() { return getMessageMetaData().getMessagePublishInfo(); diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionImpl.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionImpl.java index f069042db3..d48e8b3dea 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionImpl.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionImpl.java @@ -28,6 +28,7 @@ import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.filter.FilterManager; import org.apache.qpid.server.filter.FilterManagerFactory; +import org.apache.qpid.server.filter.Filterable; import org.apache.qpid.server.flow.FlowCreditManager; import org.apache.qpid.server.logging.LogActor; import org.apache.qpid.server.logging.LogSubject; @@ -519,7 +520,7 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage private boolean checkFilters(QueueEntry msg) { - return (_filters == null) || _filters.allAllow(msg); + return (_filters == null) || _filters.allAllow(msg.asFilterable()); } public boolean isAutoClose() diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java index 836eb69350..3b981b46b8 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java @@ -27,6 +27,7 @@ import org.apache.qpid.amqp_1_0.type.messaging.Accepted; import org.apache.qpid.amqp_1_0.type.messaging.TerminusDurability; import org.apache.qpid.amqp_1_0.type.messaging.TerminusExpiryPolicy; import org.apache.qpid.server.exchange.Exchange; +import org.apache.qpid.server.message.InstanceProperties; import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.queue.BaseQueue; import org.apache.qpid.server.txn.ServerTransaction; @@ -54,14 +55,37 @@ public class ExchangeDestination implements ReceivingDestination, SendingDestina public Outcome send(final Message_1_0 message, ServerTransaction txn) { - List<? extends BaseQueue> queues = _exchange.route(message); + final InstanceProperties instanceProperties = + new InstanceProperties() + { + + @Override + public Object getProperty(final Property prop) + { + switch(prop) + { + case MANDATORY: + return false; + case REDELIVERED: + return false; + case PERSISTENT: + return message.isPersistent(); + case IMMEDIATE: + return false; + case EXPIRATION: + return message.getExpiration(); + } + return null; + }}; + + List<? extends BaseQueue> queues = _exchange.route(message, instanceProperties); if(queues == null || queues.isEmpty()) { Exchange altExchange = _exchange.getAlternateExchange(); if(altExchange != null) { - queues = altExchange.route(message); + queues = altExchange.route(message, instanceProperties); } } diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java index e367c83c8a..66094f52f0 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java @@ -25,10 +25,9 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; import org.apache.qpid.server.message.AbstractServerMessageImpl; -import org.apache.qpid.server.message.InboundMessage; import org.apache.qpid.server.store.StoredMessage; -public class Message_1_0 extends AbstractServerMessageImpl<Message_1_0, MessageMetaData_1_0> implements InboundMessage +public class Message_1_0 extends AbstractServerMessageImpl<Message_1_0, MessageMetaData_1_0> { private List<ByteBuffer> _fragments; diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java index ad05bd8a1b..823e4cb16d 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java @@ -41,7 +41,6 @@ import org.apache.qpid.AMQSecurityException; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.logging.LogSubject; -import org.apache.qpid.server.message.InboundMessage; import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.protocol.AMQConnectionModel; import org.apache.qpid.server.protocol.AMQSessionModel; @@ -539,9 +538,9 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel, LogSu } @Override - public boolean onSameConnection(InboundMessage inbound) + public Object getConnectionReference() { - return inbound.getConnectionReference() == getConnection().getReference(); + return getConnection().getReference(); } @Override diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java index ce653766ff..e5f3a52e3b 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java @@ -57,7 +57,8 @@ import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.subscription.Subscription; import org.apache.qpid.server.txn.ServerTransaction; -class Subscription_1_0 implements Subscription +class + Subscription_1_0 implements Subscription { private SendingLink_1_0 _link; @@ -164,7 +165,7 @@ class Subscription_1_0 implements Subscription private boolean checkFilters(final QueueEntry entry) { - return (_filters == null) || _filters.allAllow(entry); + return (_filters == null) || _filters.allAllow(entry.asFilterable()); } public boolean isClosed() |
