diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2014-01-22 21:43:46 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2014-01-22 21:43:46 +0000 |
| commit | 04560e1d3c28fb21a8f8ae094a62318790474e61 (patch) | |
| tree | 32bef19f14c57e5722f29b3338a40b5dce145aba /qpid/java/broker-plugins/amqp-1-0-protocol | |
| parent | 8a8da44b127644ac9cfb43eef88d3f31a35f9aca (diff) | |
| download | qpid-python-04560e1d3c28fb21a8f8ae094a62318790474e61.tar.gz | |
QPID-5504 : remove InboundMessage... characterize routing as being on the immutable message and a set of instance properties
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1560524 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker-plugins/amqp-1-0-protocol')
4 files changed, 32 insertions, 9 deletions
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java index 836eb69350..3b981b46b8 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java @@ -27,6 +27,7 @@ import org.apache.qpid.amqp_1_0.type.messaging.Accepted; import org.apache.qpid.amqp_1_0.type.messaging.TerminusDurability; import org.apache.qpid.amqp_1_0.type.messaging.TerminusExpiryPolicy; import org.apache.qpid.server.exchange.Exchange; +import org.apache.qpid.server.message.InstanceProperties; import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.queue.BaseQueue; import org.apache.qpid.server.txn.ServerTransaction; @@ -54,14 +55,37 @@ public class ExchangeDestination implements ReceivingDestination, SendingDestina public Outcome send(final Message_1_0 message, ServerTransaction txn) { - List<? extends BaseQueue> queues = _exchange.route(message); + final InstanceProperties instanceProperties = + new InstanceProperties() + { + + @Override + public Object getProperty(final Property prop) + { + switch(prop) + { + case MANDATORY: + return false; + case REDELIVERED: + return false; + case PERSISTENT: + return message.isPersistent(); + case IMMEDIATE: + return false; + case EXPIRATION: + return message.getExpiration(); + } + return null; + }}; + + List<? extends BaseQueue> queues = _exchange.route(message, instanceProperties); if(queues == null || queues.isEmpty()) { Exchange altExchange = _exchange.getAlternateExchange(); if(altExchange != null) { - queues = altExchange.route(message); + queues = altExchange.route(message, instanceProperties); } } diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java index e367c83c8a..66094f52f0 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java @@ -25,10 +25,9 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; import org.apache.qpid.server.message.AbstractServerMessageImpl; -import org.apache.qpid.server.message.InboundMessage; import org.apache.qpid.server.store.StoredMessage; -public class Message_1_0 extends AbstractServerMessageImpl<Message_1_0, MessageMetaData_1_0> implements InboundMessage +public class Message_1_0 extends AbstractServerMessageImpl<Message_1_0, MessageMetaData_1_0> { private List<ByteBuffer> _fragments; diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java index ad05bd8a1b..823e4cb16d 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java @@ -41,7 +41,6 @@ import org.apache.qpid.AMQSecurityException; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.logging.LogSubject; -import org.apache.qpid.server.message.InboundMessage; import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.protocol.AMQConnectionModel; import org.apache.qpid.server.protocol.AMQSessionModel; @@ -539,9 +538,9 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel, LogSu } @Override - public boolean onSameConnection(InboundMessage inbound) + public Object getConnectionReference() { - return inbound.getConnectionReference() == getConnection().getReference(); + return getConnection().getReference(); } @Override diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java index ce653766ff..e5f3a52e3b 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java @@ -57,7 +57,8 @@ import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.subscription.Subscription; import org.apache.qpid.server.txn.ServerTransaction; -class Subscription_1_0 implements Subscription +class + Subscription_1_0 implements Subscription { private SendingLink_1_0 _link; @@ -164,7 +165,7 @@ class Subscription_1_0 implements Subscription private boolean checkFilters(final QueueEntry entry) { - return (_filters == null) || _filters.allAllow(entry); + return (_filters == null) || _filters.allAllow(entry.asFilterable()); } public boolean isClosed() |
