summaryrefslogtreecommitdiff
path: root/qpid/java/broker-plugins
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2014-01-22 21:43:46 +0000
committerRobert Godfrey <rgodfrey@apache.org>2014-01-22 21:43:46 +0000
commit04560e1d3c28fb21a8f8ae094a62318790474e61 (patch)
tree32bef19f14c57e5722f29b3338a40b5dce145aba /qpid/java/broker-plugins
parent8a8da44b127644ac9cfb43eef88d3f31a35f9aca (diff)
downloadqpid-python-04560e1d3c28fb21a8f8ae094a62318790474e61.tar.gz
QPID-5504 : remove InboundMessage... characterize routing as being on the immutable message and a set of instance properties
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1560524 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker-plugins')
-rwxr-xr-xqpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java1
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java4
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java8
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java37
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Subscription_0_10.java10
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java39
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java12
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionImpl.java3
-rw-r--r--qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java28
-rw-r--r--qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java3
-rw-r--r--qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java5
-rw-r--r--qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java5
12 files changed, 104 insertions, 51 deletions
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java
index a471e53fc6..2e74621814 100755
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java
@@ -21,7 +21,6 @@
package org.apache.qpid.server.protocol.v0_10;
import org.apache.qpid.server.message.AMQMessageHeader;
-import org.apache.qpid.server.message.InboundMessage;
import org.apache.qpid.server.plugin.MessageMetaDataType;
import org.apache.qpid.server.store.StorableMessageMetaData;
import org.apache.qpid.transport.DeliveryProperties;
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java
index e01fb474ac..487862bcba 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java
@@ -22,15 +22,13 @@ package org.apache.qpid.server.protocol.v0_10;
import org.apache.qpid.server.message.AMQMessageHeader;
import org.apache.qpid.server.message.AbstractServerMessageImpl;
-import org.apache.qpid.server.message.InboundMessage;
-import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.transport.Header;
import java.nio.ByteBuffer;
-public class MessageTransferMessage extends AbstractServerMessageImpl<MessageTransferMessage, MessageMetaData_0_10> implements InboundMessage
+public class MessageTransferMessage extends AbstractServerMessageImpl<MessageTransferMessage, MessageMetaData_0_10>
{
public MessageTransferMessage(StoredMessage<MessageMetaData_0_10> storeMessage, Object connectionRef)
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
index f98eb09b43..261c937836 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
@@ -53,9 +53,7 @@ import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.actors.GenericActor;
import org.apache.qpid.server.logging.messages.ChannelMessages;
import org.apache.qpid.server.logging.subjects.ChannelLogSubject;
-import org.apache.qpid.server.message.InboundMessage;
import org.apache.qpid.server.message.MessageReference;
-import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.queue.AMQQueue;
@@ -766,12 +764,12 @@ public class ServerSession extends Session
}
}
- public boolean onSameConnection(InboundMessage inbound)
+ @Override
+ public Object getConnectionReference()
{
- return inbound.getConnectionReference() == getConnection().getReference();
+ return getConnection().getReference();
}
-
public String toLogString()
{
long connectionId = super.getConnection() instanceof ServerConnection
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
index 182e71c957..8756beb690 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
@@ -33,7 +33,8 @@ import org.apache.qpid.server.exchange.HeadersExchange;
import org.apache.qpid.server.filter.FilterManager;
import org.apache.qpid.server.filter.FilterManagerFactory;
import org.apache.qpid.server.logging.messages.ExchangeMessages;
-import org.apache.qpid.server.message.AbstractServerMessageImpl;import org.apache.qpid.server.message.MessageReference;
+import org.apache.qpid.server.message.InstanceProperties;
+import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.plugin.ExchangeType;
@@ -290,9 +291,8 @@ public class ServerSessionDelegate extends SessionDelegate
{
final Exchange exchange = getExchangeForMessage(ssn, xfr);
- DeliveryProperties delvProps = null;
- if(xfr.getHeader() != null && (delvProps = xfr.getHeader().getDeliveryProperties()) != null && delvProps.hasTtl() && !delvProps
- .hasExpiration())
+ final DeliveryProperties delvProps = xfr.getHeader() == null ? null : xfr.getHeader().getDeliveryProperties();
+ if(delvProps != null && delvProps.hasTtl() && !delvProps.hasExpiration())
{
delvProps.setExpiration(System.currentTimeMillis() + delvProps.getTtl());
}
@@ -312,13 +312,36 @@ public class ServerSessionDelegate extends SessionDelegate
final MessageStore store = getVirtualHost(ssn).getMessageStore();
final StoredMessage<MessageMetaData_0_10> storeMessage = createStoreMessage(xfr, messageMetaData, store);
final ServerSession serverSession = (ServerSession) ssn;
- MessageTransferMessage message = new MessageTransferMessage(storeMessage, serverSession.getReference());
+ final MessageTransferMessage message = new MessageTransferMessage(storeMessage, serverSession.getReference());
MessageReference<MessageTransferMessage> reference = message.newReference();
- List<? extends BaseQueue> queues = exchange.route(message);
+
+ final InstanceProperties instanceProperties = new InstanceProperties()
+ {
+ @Override
+ public Object getProperty(final Property prop)
+ {
+ switch(prop)
+ {
+ case EXPIRATION:
+ return message.getExpiration();
+ case IMMEDIATE:
+ return message.isImmediate();
+ case MANDATORY:
+ return (delvProps == null || !delvProps.getDiscardUnroutable()) && xfr.getAcceptMode() == MessageAcceptMode.EXPLICIT;
+ case PERSISTENT:
+ return message.isPersistent();
+ case REDELIVERED:
+ return delvProps.getRedelivered();
+ }
+ return null;
+ }
+ };
+
+ List<? extends BaseQueue> queues = exchange.route(message, instanceProperties);
if(queues.isEmpty() && exchange.getAlternateExchange() != null)
{
final Exchange alternateExchange = exchange.getAlternateExchange();
- queues = alternateExchange.route(message);
+ queues = alternateExchange.route(message, instanceProperties);
if (!queues.isEmpty())
{
exchangeInUse = alternateExchange;
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Subscription_0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Subscription_0_10.java
index 77b63906cc..f68973096a 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Subscription_0_10.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Subscription_0_10.java
@@ -33,13 +33,11 @@ import org.apache.qpid.server.logging.messages.SubscriptionMessages;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.plugin.MessageConverter;
import org.apache.qpid.server.protocol.MessageConverterRegistry;
-import org.apache.qpid.server.message.InboundMessage;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.BaseQueue;
-import org.apache.qpid.server.queue.InboundMessageAdapter;
-import org.apache.qpid.server.queue.QueueArgumentsConverter;
import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.queue.QueueEntryInstanceProperties;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
@@ -230,7 +228,7 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr
private boolean checkFilters(QueueEntry entry)
{
- return (_filters == null) || _filters.allAllow(entry);
+ return (_filters == null) || _filters.allAllow(entry.asFilterable());
}
public boolean isClosed()
@@ -583,9 +581,7 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr
final ServerMessage msg = entry.getMessage();
if (alternateExchange != null)
{
- final InboundMessage m = new InboundMessageAdapter(entry);
-
- final List<? extends BaseQueue> destinationQueues = alternateExchange.route(m);
+ final List<? extends BaseQueue> destinationQueues = alternateExchange.route(entry.getMessage(), new QueueEntryInstanceProperties(entry));
if (destinationQueues == null || destinationQueues.isEmpty())
{
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
index c6e0dfc3e2..bb1d1949a2 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
@@ -65,7 +65,7 @@ import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.messages.ChannelMessages;
import org.apache.qpid.server.logging.messages.ExchangeMessages;
import org.apache.qpid.server.logging.subjects.ChannelLogSubject;
-import org.apache.qpid.server.message.InboundMessage;
+import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter;
@@ -73,8 +73,8 @@ import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.BaseQueue;
-import org.apache.qpid.server.queue.InboundMessageAdapter;
import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.queue.QueueEntryInstanceProperties;
import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreFuture;
@@ -331,7 +331,31 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
}
else
{
- final List<? extends BaseQueue> destinationQueues = _currentMessage.getExchange().route(amqMessage);
+ final InstanceProperties instanceProperties =
+ new InstanceProperties()
+ {
+ @Override
+ public Object getProperty(final Property prop)
+ {
+ switch(prop)
+ {
+ case EXPIRATION:
+ return amqMessage.getExpiration();
+ case IMMEDIATE:
+ return _currentMessage.getMessagePublishInfo().isImmediate();
+ case PERSISTENT:
+ return amqMessage.isPersistent();
+ case MANDATORY:
+ return _currentMessage.getMessagePublishInfo().isMandatory();
+ case REDELIVERED:
+ return false;
+ }
+ return null;
+ }
+ };
+
+ final List<? extends BaseQueue> destinationQueues =
+ _currentMessage.getExchange().route(amqMessage, instanceProperties);
if(destinationQueues == null || destinationQueues.isEmpty())
{
@@ -1472,9 +1496,10 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
}
}
- public boolean onSameConnection(InboundMessage inbound)
+ @Override
+ public Object getConnectionReference()
{
- return getProtocolSession().getReference() == inbound.getConnectionReference();
+ return getProtocolSession().getReference();
}
public int getUnacknowledgedMessageCount()
@@ -1550,9 +1575,9 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
return;
}
- final InboundMessage m = new InboundMessageAdapter(rejectedQueueEntry);
- final List<? extends BaseQueue> destinationQueues = altExchange.route(m);
+ final List<? extends BaseQueue> destinationQueues =
+ altExchange.route(rejectedQueueEntry.getMessage(), new QueueEntryInstanceProperties(rejectedQueueEntry));
if (destinationQueues == null || destinationQueues.isEmpty())
{
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java
index b73b6bc0aa..833f5fb06f 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java
@@ -22,15 +22,11 @@ package org.apache.qpid.server.protocol.v0_8;
import org.apache.log4j.Logger;
-import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.server.message.AMQMessageHeader;
import org.apache.qpid.server.message.AbstractServerMessageImpl;
-import org.apache.qpid.server.message.InboundMessage;
-import org.apache.qpid.server.message.MessageReference;
-import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.store.StoredMessage;
import java.nio.ByteBuffer;
@@ -38,7 +34,7 @@ import java.nio.ByteBuffer;
/**
* A deliverable message.
*/
-public class AMQMessage extends AbstractServerMessageImpl<AMQMessage, MessageMetaData> implements InboundMessage
+public class AMQMessage extends AbstractServerMessageImpl<AMQMessage, MessageMetaData>
{
/** Used for debugging purposes. */
private static final Logger _log = Logger.getLogger(AMQMessage.class);
@@ -94,12 +90,6 @@ public class AMQMessage extends AbstractServerMessageImpl<AMQMessage, MessageMet
return getMessageMetaData().getMessageHeader();
}
- @Override
- public boolean isRedelivered()
- {
- return false;
- }
-
public MessagePublishInfo getMessagePublishInfo()
{
return getMessageMetaData().getMessagePublishInfo();
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionImpl.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionImpl.java
index f069042db3..d48e8b3dea 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionImpl.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionImpl.java
@@ -28,6 +28,7 @@ import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.filter.FilterManager;
import org.apache.qpid.server.filter.FilterManagerFactory;
+import org.apache.qpid.server.filter.Filterable;
import org.apache.qpid.server.flow.FlowCreditManager;
import org.apache.qpid.server.logging.LogActor;
import org.apache.qpid.server.logging.LogSubject;
@@ -519,7 +520,7 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage
private boolean checkFilters(QueueEntry msg)
{
- return (_filters == null) || _filters.allAllow(msg);
+ return (_filters == null) || _filters.allAllow(msg.asFilterable());
}
public boolean isAutoClose()
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java
index 836eb69350..3b981b46b8 100644
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java
@@ -27,6 +27,7 @@ import org.apache.qpid.amqp_1_0.type.messaging.Accepted;
import org.apache.qpid.amqp_1_0.type.messaging.TerminusDurability;
import org.apache.qpid.amqp_1_0.type.messaging.TerminusExpiryPolicy;
import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.queue.BaseQueue;
import org.apache.qpid.server.txn.ServerTransaction;
@@ -54,14 +55,37 @@ public class ExchangeDestination implements ReceivingDestination, SendingDestina
public Outcome send(final Message_1_0 message, ServerTransaction txn)
{
- List<? extends BaseQueue> queues = _exchange.route(message);
+ final InstanceProperties instanceProperties =
+ new InstanceProperties()
+ {
+
+ @Override
+ public Object getProperty(final Property prop)
+ {
+ switch(prop)
+ {
+ case MANDATORY:
+ return false;
+ case REDELIVERED:
+ return false;
+ case PERSISTENT:
+ return message.isPersistent();
+ case IMMEDIATE:
+ return false;
+ case EXPIRATION:
+ return message.getExpiration();
+ }
+ return null;
+ }};
+
+ List<? extends BaseQueue> queues = _exchange.route(message, instanceProperties);
if(queues == null || queues.isEmpty())
{
Exchange altExchange = _exchange.getAlternateExchange();
if(altExchange != null)
{
- queues = altExchange.route(message);
+ queues = altExchange.route(message, instanceProperties);
}
}
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java
index e367c83c8a..66094f52f0 100644
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java
@@ -25,10 +25,9 @@ import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import org.apache.qpid.server.message.AbstractServerMessageImpl;
-import org.apache.qpid.server.message.InboundMessage;
import org.apache.qpid.server.store.StoredMessage;
-public class Message_1_0 extends AbstractServerMessageImpl<Message_1_0, MessageMetaData_1_0> implements InboundMessage
+public class Message_1_0 extends AbstractServerMessageImpl<Message_1_0, MessageMetaData_1_0>
{
private List<ByteBuffer> _fragments;
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
index ad05bd8a1b..823e4cb16d 100644
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
@@ -41,7 +41,6 @@ import org.apache.qpid.AMQSecurityException;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.logging.LogSubject;
-import org.apache.qpid.server.message.InboundMessage;
import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.protocol.AMQSessionModel;
@@ -539,9 +538,9 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel, LogSu
}
@Override
- public boolean onSameConnection(InboundMessage inbound)
+ public Object getConnectionReference()
{
- return inbound.getConnectionReference() == getConnection().getReference();
+ return getConnection().getReference();
}
@Override
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java
index ce653766ff..e5f3a52e3b 100644
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java
@@ -57,7 +57,8 @@ import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.txn.ServerTransaction;
-class Subscription_1_0 implements Subscription
+class
+ Subscription_1_0 implements Subscription
{
private SendingLink_1_0 _link;
@@ -164,7 +165,7 @@ class Subscription_1_0 implements Subscription
private boolean checkFilters(final QueueEntry entry)
{
- return (_filters == null) || _filters.allAllow(entry);
+ return (_filters == null) || _filters.allAllow(entry.asFilterable());
}
public boolean isClosed()