summaryrefslogtreecommitdiff
path: root/qpid/java/broker-plugins/amqp-1-0-protocol
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2014-01-22 21:43:46 +0000
committerRobert Godfrey <rgodfrey@apache.org>2014-01-22 21:43:46 +0000
commit04560e1d3c28fb21a8f8ae094a62318790474e61 (patch)
tree32bef19f14c57e5722f29b3338a40b5dce145aba /qpid/java/broker-plugins/amqp-1-0-protocol
parent8a8da44b127644ac9cfb43eef88d3f31a35f9aca (diff)
downloadqpid-python-04560e1d3c28fb21a8f8ae094a62318790474e61.tar.gz
QPID-5504 : remove InboundMessage... characterize routing as being on the immutable message and a set of instance properties
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1560524 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker-plugins/amqp-1-0-protocol')
-rw-r--r--qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java28
-rw-r--r--qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java3
-rw-r--r--qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java5
-rw-r--r--qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java5
4 files changed, 32 insertions, 9 deletions
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java
index 836eb69350..3b981b46b8 100644
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java
@@ -27,6 +27,7 @@ import org.apache.qpid.amqp_1_0.type.messaging.Accepted;
import org.apache.qpid.amqp_1_0.type.messaging.TerminusDurability;
import org.apache.qpid.amqp_1_0.type.messaging.TerminusExpiryPolicy;
import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.queue.BaseQueue;
import org.apache.qpid.server.txn.ServerTransaction;
@@ -54,14 +55,37 @@ public class ExchangeDestination implements ReceivingDestination, SendingDestina
public Outcome send(final Message_1_0 message, ServerTransaction txn)
{
- List<? extends BaseQueue> queues = _exchange.route(message);
+ final InstanceProperties instanceProperties =
+ new InstanceProperties()
+ {
+
+ @Override
+ public Object getProperty(final Property prop)
+ {
+ switch(prop)
+ {
+ case MANDATORY:
+ return false;
+ case REDELIVERED:
+ return false;
+ case PERSISTENT:
+ return message.isPersistent();
+ case IMMEDIATE:
+ return false;
+ case EXPIRATION:
+ return message.getExpiration();
+ }
+ return null;
+ }};
+
+ List<? extends BaseQueue> queues = _exchange.route(message, instanceProperties);
if(queues == null || queues.isEmpty())
{
Exchange altExchange = _exchange.getAlternateExchange();
if(altExchange != null)
{
- queues = altExchange.route(message);
+ queues = altExchange.route(message, instanceProperties);
}
}
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java
index e367c83c8a..66094f52f0 100644
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java
@@ -25,10 +25,9 @@ import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import org.apache.qpid.server.message.AbstractServerMessageImpl;
-import org.apache.qpid.server.message.InboundMessage;
import org.apache.qpid.server.store.StoredMessage;
-public class Message_1_0 extends AbstractServerMessageImpl<Message_1_0, MessageMetaData_1_0> implements InboundMessage
+public class Message_1_0 extends AbstractServerMessageImpl<Message_1_0, MessageMetaData_1_0>
{
private List<ByteBuffer> _fragments;
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
index ad05bd8a1b..823e4cb16d 100644
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
@@ -41,7 +41,6 @@ import org.apache.qpid.AMQSecurityException;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.logging.LogSubject;
-import org.apache.qpid.server.message.InboundMessage;
import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.protocol.AMQSessionModel;
@@ -539,9 +538,9 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel, LogSu
}
@Override
- public boolean onSameConnection(InboundMessage inbound)
+ public Object getConnectionReference()
{
- return inbound.getConnectionReference() == getConnection().getReference();
+ return getConnection().getReference();
}
@Override
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java
index ce653766ff..e5f3a52e3b 100644
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java
@@ -57,7 +57,8 @@ import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.txn.ServerTransaction;
-class Subscription_1_0 implements Subscription
+class
+ Subscription_1_0 implements Subscription
{
private SendingLink_1_0 _link;
@@ -164,7 +165,7 @@ class Subscription_1_0 implements Subscription
private boolean checkFilters(final QueueEntry entry)
{
- return (_filters == null) || _filters.allAllow(entry);
+ return (_filters == null) || _filters.allAllow(entry.asFilterable());
}
public boolean isClosed()