From 31ebe383026bc208da66614a8537f52d8f3ed87c Mon Sep 17 00:00:00 2001 From: Martin Ritchie Date: Tue, 19 Dec 2006 16:07:12 +0000 Subject: QPID-216 BasicConsumeMethodHandler.java - Pulled the nolocal param from the method body and passed down channel to subscription. SubscriptionFactory.java / AMQQueue.java/AMQChannel.java - passed the nolocal parameter through to the Subscription ConnectionStartOkMethodHandler.java - Saved the client properties so the client identifier can be used in comparison with the publisher id to implement no_local AMQMinaProtocolSession.java - added _clientProperties to store the sent client properties. AMQProtocolSession.java - interface changes to get/set ClientProperties ConcurrentSelectorDeliveryManager.java - only need to do hasInterset as this will take care of the hasFilters optimisation check. SubscriptionImpl.java - Added code to do comparison of client ids to determin insterest in a given message. SubscriptionSet.java - tidied up code to use hasInterest as this is where the nolocal is implemented. ConnectionStartMethodHandler.java - Moved literal values to a ClientProperties.java enumeration and a QpidProperties.java values. QpidConnectionMetaData.java - updated to get values from QpidProperties.java MockProtocolSession.java - null implementation of new get/set methods git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@488712 13f79535-47bb-0310-9956-ffa450edef68 --- .../java/org/apache/qpid/server/AMQChannel.java | 8 ++- .../server/handler/BasicConsumeMethodHandler.java | 7 ++- .../handler/ConnectionStartOkMethodHandler.java | 9 ++- .../server/protocol/AMQMinaProtocolSession.java | 66 ++++++++++++++-------- .../qpid/server/protocol/AMQProtocolSession.java | 6 ++ .../org/apache/qpid/server/queue/AMQQueue.java | 11 +++- .../queue/ConcurrentSelectorDeliveryManager.java | 2 +- .../qpid/server/queue/SubscriptionFactory.java | 10 ++-- .../apache/qpid/server/queue/SubscriptionImpl.java | 39 ++++++++----- .../apache/qpid/server/queue/SubscriptionSet.java | 22 +++----- 10 files changed, 112 insertions(+), 68 deletions(-) (limited to 'qpid/java/broker/src') diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index d8485ef0f2..117231b36e 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -286,12 +286,14 @@ public class AMQChannel * @param tag the tag chosen by the client (if null, server will generate one) * @param queue the queue to subscribe to * @param session the protocol session of the subscriber + * @param noLocal * @return the consumer tag. This is returned to the subscriber and used in * subsequent unsubscribe requests * @throws ConsumerTagNotUniqueException if the tag is not unique * @throws AMQException if something goes wrong */ - public String subscribeToQueue(String tag, AMQQueue queue, AMQProtocolSession session, boolean acks, FieldTable filters) throws AMQException, ConsumerTagNotUniqueException + public String subscribeToQueue(String tag, AMQQueue queue, AMQProtocolSession session, boolean acks, + FieldTable filters, boolean noLocal) throws AMQException, ConsumerTagNotUniqueException { if (tag == null) { @@ -302,7 +304,7 @@ public class AMQChannel throw new ConsumerTagNotUniqueException(); } - queue.registerProtocolSession(session, _channelId, tag, acks, filters); + queue.registerProtocolSession(session, _channelId, tag, acks, filters,noLocal); _consumerTag2QueueMap.put(tag, queue); return tag; } @@ -499,7 +501,7 @@ public class AMQChannel if (_log.isDebugEnabled()) { _log.debug("Handling acknowledgement for channel " + _channelId + " with delivery tag " + deliveryTag + - " and multiple " + multiple); + " and multiple " + multiple); } if (multiple) { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java index bf282020ee..1e57c714ff 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java @@ -77,7 +77,8 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener evt = new AMQMethodEvent(frame.channel, - (AMQMethodBody)frame.bodyFrame); + (AMQMethodBody) frame.bodyFrame); try { boolean wasAnyoneInterested = false; @@ -266,7 +272,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, { _logger.debug("Content header frame received: " + frame); } - getChannel(frame.channel).publishContentHeader((ContentHeaderBody)frame.bodyFrame); + getChannel(frame.channel).publishContentHeader((ContentHeaderBody) frame.bodyFrame); } private void contentBodyReceived(AMQFrame frame) throws AMQException @@ -275,7 +281,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, { _logger.debug("Content body frame received: " + frame); } - getChannel(frame.channel).publishContentBody((ContentBody)frame.bodyFrame); + getChannel(frame.channel).publishContentBody((ContentBody) frame.bodyFrame); } /** @@ -355,6 +361,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, * Close a specific channel. This will remove any resources used by the channel, including: *
  • any queue subscriptions (this may in turn remove queues if they are auto delete
  • *
+ * * @param channelId id of the channel to close * @throws AMQException if an error occurs closing the channel * @throws IllegalArgumentException if the channel id is not valid @@ -381,6 +388,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, /** * In our current implementation this is used by the clustering code. + * * @param channelId */ public void removeChannel(int channelId) @@ -390,11 +398,12 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, /** * Initialise heartbeats on the session. + * * @param delay delay in seconds (not ms) */ public void initHeartbeats(int delay) { - if(delay > 0) + if (delay > 0) { _minaProtocolSession.setIdleTime(IdleStatus.WRITER_IDLE, delay); _minaProtocolSession.setIdleTime(IdleStatus.READER_IDLE, HeartbeatConfig.getInstance().getTimeout(delay)); @@ -404,6 +413,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, /** * Closes all channels that were opened by this protocol session. This frees up all resources * used by the channel. + * * @throws AMQException if an error occurs while closing any channel */ private void closeAllChannels() throws AMQException @@ -421,7 +431,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, */ public void closeSession() throws AMQException { - if(!_closed) + if (!_closed) { _closed = true; closeAllChannels(); @@ -463,11 +473,11 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, // information is used by SASL primary. if (address instanceof InetSocketAddress) { - return ((InetSocketAddress)address).getHostName(); + return ((InetSocketAddress) address).getHostName(); } else if (address instanceof VmPipeAddress) { - return "vmpipe:" + ((VmPipeAddress)address).getPort(); + return "vmpipe:" + ((VmPipeAddress) address).getPort(); } else { @@ -484,22 +494,32 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, { _saslServer = saslServer; } - + + public FieldTable getClientProperties() + { + return _clientProperties; + } + + public void setClientProperties(FieldTable clientProperties) + { + _clientProperties = clientProperties; + } + /** * Convenience methods for managing AMQP version. * NOTE: Both major and minor will be set to 0 prior to protocol initiation. */ - + public byte getAmqpMajor() { return _major; } - + public byte getAmqpMinor() { return _minor; } - + public boolean amqpVersionEquals(byte major, byte minor) { return _major == major && _minor == minor; diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java index acaf6b0d9b..03d0c50dac 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java @@ -21,6 +21,7 @@ package org.apache.qpid.server.protocol; import org.apache.qpid.framing.AMQDataBlock; +import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.AMQChannel; import org.apache.qpid.AMQException; @@ -122,4 +123,9 @@ public interface AMQProtocolSession * @param saslServer */ void setSaslServer(SaslServer saslServer); + + + FieldTable getClientProperties(); + + void setClientProperties(FieldTable clientProperties); } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java index e64daef690..561b719b2e 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java @@ -96,7 +96,7 @@ public class AMQQueue implements Managable, Comparable * max allowed number of messages on a queue. */ private Integer _maxMessageCount = 10000; - + /** * max queue depth(KB) for the queue */ @@ -362,12 +362,17 @@ public class AMQQueue implements Managable, Comparable _bindings.addBinding(routingKey, exchange); } - public void registerProtocolSession(AMQProtocolSession ps, int channel, String consumerTag, boolean acks, FieldTable filters) + public void registerProtocolSession(AMQProtocolSession ps, int channel, String consumerTag, boolean acks, FieldTable filters) throws AMQException + { + registerProtocolSession(ps, channel, consumerTag, acks, filters, false); + } + + public void registerProtocolSession(AMQProtocolSession ps, int channel, String consumerTag, boolean acks, FieldTable filters, boolean noLocal) throws AMQException { debug("Registering protocol session {0} with channel {1} and consumer tag {2} with {3}", ps, channel, consumerTag, this); - Subscription subscription = _subscriptionFactory.createSubscription(channel, ps, consumerTag, acks, filters); + Subscription subscription = _subscriptionFactory.createSubscription(channel, ps, consumerTag, acks, filters, noLocal); _subscribers.addSubscriber(subscription); } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java index d8bb6e1948..8bdadcb493 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java @@ -281,7 +281,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager } // Only give the message to those that want them. - if (sub.hasFilters() && sub.hasInterest(msg)) + if (sub.hasInterest(msg)) { sub.enqueueForPreDelivery(msg); } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionFactory.java index f464384562..2bb77dc649 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionFactory.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionFactory.java @@ -33,12 +33,10 @@ import org.apache.qpid.framing.FieldTable; */ public interface SubscriptionFactory { - Subscription createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag, boolean acks, FieldTable filters) - throws AMQException; + Subscription createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag, boolean acks, + FieldTable filters, boolean noLocal) throws AMQException; - Subscription createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag, boolean acks) - throws AMQException; - Subscription createSubscription(int channel, AMQProtocolSession protocolSession,String consumerTag) - throws AMQException; + Subscription createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag) + throws AMQException; } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java index 79b0593f69..fc00754cda 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java @@ -23,6 +23,7 @@ package org.apache.qpid.server.queue; import org.apache.log4j.Logger; import org.apache.mina.common.ByteBuffer; import org.apache.qpid.AMQException; +import org.apache.qpid.common.ClientProperties; import org.apache.qpid.util.ConcurrentLinkedQueueAtomicSize; import org.apache.qpid.framing.AMQDataBlock; import org.apache.qpid.framing.AMQFrame; @@ -56,6 +57,7 @@ public class SubscriptionImpl implements Subscription private Queue _messages; + private final boolean _noLocal; /** * True if messages need to be acknowledged @@ -65,21 +67,15 @@ public class SubscriptionImpl implements Subscription public static class Factory implements SubscriptionFactory { - public Subscription createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag, boolean acks, FieldTable filters) throws AMQException + public Subscription createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag, boolean acks, FieldTable filters, boolean noLocal) throws AMQException { - return new SubscriptionImpl(channel, protocolSession, consumerTag, acks, filters); - } - - public SubscriptionImpl createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag, boolean acks) - throws AMQException - { - return new SubscriptionImpl(channel, protocolSession, consumerTag, acks, null); + return new SubscriptionImpl(channel, protocolSession, consumerTag, acks, filters, noLocal); } public SubscriptionImpl createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag) throws AMQException { - return new SubscriptionImpl(channel, protocolSession, consumerTag, false, null); + return new SubscriptionImpl(channel, protocolSession, consumerTag, false, null, false); } } @@ -87,11 +83,11 @@ public class SubscriptionImpl implements Subscription String consumerTag, boolean acks) throws AMQException { - this(channelId, protocolSession, consumerTag, acks, null); + this(channelId, protocolSession, consumerTag, acks, null, false); } public SubscriptionImpl(int channelId, AMQProtocolSession protocolSession, - String consumerTag, boolean acks, FieldTable filters) + String consumerTag, boolean acks, FieldTable filters, boolean noLocal) throws AMQException { AMQChannel channel = protocolSession.getChannel(channelId); @@ -105,6 +101,8 @@ public class SubscriptionImpl implements Subscription this.consumerTag = consumerTag; sessionKey = protocolSession.getKey(); _acks = acks; + _noLocal = noLocal; + _filters = FilterManagerFactory.createManager(filters); if (_filters != null) @@ -218,7 +216,22 @@ public class SubscriptionImpl implements Subscription public boolean hasInterest(AMQMessage msg) { - return _filters.allAllow(msg); + if (_noLocal) + { + return !(protocolSession.getClientProperties().get(ClientProperties.instance.toString()).equals( + msg.getPublisher().getClientProperties().get(ClientProperties.instance.toString()))); + } + else + { + if (_filters != null) + { + return _filters.allAllow(msg); + } + else + { + return true; + } + } } public Queue getPreDeliveryQueue() @@ -235,8 +248,6 @@ public class SubscriptionImpl implements Subscription } - - private ByteBuffer createEncodedDeliverFrame(long deliveryTag, String routingKey, String exchange) { AMQFrame deliverFrame = BasicDeliverBody.createAMQFrame(channel.getChannelId(), consumerTag, diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java index a4afe18e4d..91e720ea54 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java @@ -139,22 +139,15 @@ class SubscriptionSet implements WeightedSubscriptionManager if (!subscription.isSuspended()) { - if (!subscription.hasFilters()) + if (subscription.hasInterest(msg)) { - return subscription; - } - else - { - if (subscription.hasInterest(msg)) + // if the queue is not empty then this client is ready to receive a message. + //FIXME the queue could be full of sent messages. + // Either need to clean all PDQs after sending a message + // OR have a clean up thread that runs the PDQs expunging the messages. + if (!subscription.hasFilters() || subscription.getPreDeliveryQueue().isEmpty()) { - // if the queue is not empty then this client is ready to receive a message. - //FIXME the queue could be full of sent messages. - // Either need to clean all PDQs after sending a message - // OR have a clean up thread that runs the PDQs expunging the messages. - if (subscription.getPreDeliveryQueue().isEmpty()) - { - return subscription; - } + return subscription; } } } @@ -208,6 +201,7 @@ class SubscriptionSet implements WeightedSubscriptionManager /** * Notification that a queue has been deleted. This is called so that the subscription can inform the * channel, which in turn can update its list of unacknowledged messages. + * * @param queue */ public void queueDeleted(AMQQueue queue) -- cgit v1.2.1