diff options
Diffstat (limited to 'java/broker/src')
4 files changed, 90 insertions, 52 deletions
diff --git a/java/broker/src/org/apache/qpid/server/AMQChannel.java b/java/broker/src/org/apache/qpid/server/AMQChannel.java index 8dc4626c46..d4226c42aa 100644 --- a/java/broker/src/org/apache/qpid/server/AMQChannel.java +++ b/java/broker/src/org/apache/qpid/server/AMQChannel.java @@ -23,26 +23,28 @@ import org.apache.qpid.framing.BasicPublishBody; import org.apache.qpid.framing.ContentBody; import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.server.exchange.MessageRouter; +import org.apache.qpid.server.management.DefaultManagedObject; +import org.apache.qpid.server.management.Managable; +import org.apache.qpid.server.management.ManagedObject; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.queue.AMQMessage; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.txn.TxnBuffer; import org.apache.qpid.server.txn.TxnOp; -import org.apache.qpid.server.management.Managable; -import org.apache.qpid.server.management.ManagedObject; -import org.apache.qpid.server.management.DefaultManagedObject; -import javax.management.ObjectName; -import javax.management.MalformedObjectNameException; import javax.management.JMException; import javax.management.MBeanException; +import javax.management.MalformedObjectNameException; +import javax.management.ObjectName; import java.util.Iterator; import java.util.LinkedHashMap; import java.util.LinkedList; import java.util.Map; import java.util.TreeMap; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + public class AMQChannel implements Managable { @@ -62,7 +64,7 @@ public class AMQChannel implements Managable * The delivery tag is unique per channel. This is pre-incremented before putting into the deliver frame so that * value of this represents the <b>last</b> tag sent out */ - private long _deliveryTag; + private AtomicLong _deliveryTag = new AtomicLong(0); /** * A channel has a default queue (the last declared) that is used when no queue name is @@ -74,7 +76,7 @@ public class AMQChannel implements Managable * This tag is unique per subscription to a queue. The server returns this in response to a * basic.consume request. */ - private int _consumerTag = 0; + private int _consumerTag; /** * The current message - which may be partial in the sense that not all frames have been received yet - @@ -150,7 +152,7 @@ public class AMQChannel implements Managable _txnBuffer.commit(); } } - catch(AMQException ex) + catch (AMQException ex) { throw new MBeanException(ex, ex.toString()); } @@ -160,13 +162,13 @@ public class AMQChannel implements Managable { if (_transactional) { - synchronized (_txnBuffer) + synchronized(_txnBuffer) { try { _txnBuffer.rollback(); } - catch(AMQException ex) + catch (AMQException ex) { throw new MBeanException(ex, ex.toString()); } @@ -201,7 +203,7 @@ public class AMQChannel implements Managable } public AMQChannel(int channelId, MessageStore messageStore, MessageRouter exchanges) - throws AMQException + throws AMQException { _channelId = channelId; _channelName = _channelId + "-" + this.hashCode(); @@ -300,7 +302,7 @@ public class AMQChannel implements Managable public long getNextDeliveryTag() { - return ++_deliveryTag; + return _deliveryTag.incrementAndGet(); } public int getNextConsumerTag() @@ -348,7 +350,7 @@ public class AMQChannel implements Managable else { throw new AMQException(_log, "Consumer tag " + consumerTag + " not known to channel " + - _channelId); + _channelId); } } @@ -361,7 +363,7 @@ public class AMQChannel implements Managable { if (_transactional) { - synchronized (_txnBuffer) + synchronized(_txnBuffer) { _txnBuffer.rollback();//releases messages } @@ -390,7 +392,7 @@ public class AMQChannel implements Managable */ public void addUnacknowledgedMessage(AMQMessage message, long deliveryTag, String consumerTag, AMQQueue queue) { - synchronized (_unacknowledgedMessageMapLock) + synchronized(_unacknowledgedMessageMapLock) { _unacknowledgedMessageMap.put(deliveryTag, new UnacknowledgedMessage(queue, message, consumerTag)); checkSuspension(); @@ -405,7 +407,7 @@ public class AMQChannel implements Managable { // we must create a new map since all the messages will get a new delivery tag when they are redelivered Map<Long, UnacknowledgedMessage> currentList; - synchronized (_unacknowledgedMessageMapLock) + synchronized(_unacknowledgedMessageMapLock) { currentList = _unacknowledgedMessageMap; _unacknowledgedMessageMap = new LinkedHashMap<Long, UnacknowledgedMessage>(DEFAULT_PREFETCH); @@ -426,7 +428,7 @@ public class AMQChannel implements Managable public void resend(AMQProtocolSession session) { //messages go to this channel - synchronized (_unacknowledgedMessageMapLock) + synchronized(_unacknowledgedMessageMapLock) { for (Map.Entry<Long, UnacknowledgedMessage> entry : _unacknowledgedMessageMap.entrySet()) { @@ -449,7 +451,7 @@ public class AMQChannel implements Managable */ public void queueDeleted(AMQQueue queue) { - synchronized (_unacknowledgedMessageMapLock) + synchronized(_unacknowledgedMessageMapLock) { for (Map.Entry<Long, UnacknowledgedMessage> unacked : _unacknowledgedMessageMap.entrySet()) { @@ -465,13 +467,25 @@ public class AMQChannel implements Managable catch (AMQException e) { _log.error("Error decrementing ref count on message " + unackedMsg.message.getMessageId() + ": " + - e, e); + e, e); } } } } } + public synchronized long prepareNewMessageForDelivery(boolean acks, AMQMessage msg, String consumerTag, AMQQueue queue) + { + long deliveryTag = getNextDeliveryTag(); + + if (acks) + { + addUnacknowledgedMessage(msg, deliveryTag, consumerTag, queue); + } + + return deliveryTag; + } + /** * Acknowledge one or more messages. * @@ -498,7 +512,7 @@ public class AMQChannel implements Managable if (multiple) { LinkedList<UnacknowledgedMessage> acked = new LinkedList<UnacknowledgedMessage>(); - synchronized (_unacknowledgedMessageMapLock) + synchronized(_unacknowledgedMessageMapLock) { if (deliveryTag == 0) { @@ -514,10 +528,20 @@ public class AMQChannel implements Managable throw new AMQException("Multiple ack on delivery tag " + deliveryTag + " not known for channel"); } Iterator<Map.Entry<Long, UnacknowledgedMessage>> i = _unacknowledgedMessageMap.entrySet().iterator(); + while (i.hasNext()) { + Map.Entry<Long, UnacknowledgedMessage> unacked = i.next(); + + if (unacked.getKey() > deliveryTag) + { + //This should not occur now. + throw new AMQException("UnacknowledgedMessageMap is out of order:" + unacked.getKey() + " When deliveryTag is:" + deliveryTag + "ES:" + _unacknowledgedMessageMap.entrySet().toString()); + } + i.remove(); + acked.add(unacked.getValue()); if (unacked.getKey() == deliveryTag) { @@ -525,11 +549,12 @@ public class AMQChannel implements Managable } } } - } + }// synchronized + if (_log.isDebugEnabled()) { _log.debug("Received multiple ack for delivery tag " + deliveryTag + ". Removing " + - acked.size() + " items."); + acked.size() + " items."); } for (UnacknowledgedMessage msg : acked) @@ -541,12 +566,14 @@ public class AMQChannel implements Managable else { UnacknowledgedMessage msg; - synchronized (_unacknowledgedMessageMapLock) + synchronized(_unacknowledgedMessageMapLock) { msg = _unacknowledgedMessageMap.remove(deliveryTag); } + if (msg == null) { + _log.info("Single ack on delivery tag " + deliveryTag + " not known for channel:" + _channelId); throw new AMQException("Single ack on delivery tag " + deliveryTag + " not known for channel:" + _channelId); } msg.discard(); @@ -573,7 +600,7 @@ public class AMQChannel implements Managable { boolean suspend; //noinspection SynchronizeOnNonFinalField - synchronized (_unacknowledgedMessageMapLock) + synchronized(_unacknowledgedMessageMapLock) { suspend = _unacknowledgedMessageMap.size() >= _prefetchCount; } @@ -614,7 +641,7 @@ public class AMQChannel implements Managable public void rollback() throws AMQException { //need to protect rollback and close from each other... - synchronized (_txnBuffer) + synchronized(_txnBuffer) { _txnBuffer.rollback(); } diff --git a/java/broker/src/org/apache/qpid/server/exchange/DestNameExchange.java b/java/broker/src/org/apache/qpid/server/exchange/DestNameExchange.java index 7f1c7df224..a703595cc4 100644 --- a/java/broker/src/org/apache/qpid/server/exchange/DestNameExchange.java +++ b/java/broker/src/org/apache/qpid/server/exchange/DestNameExchange.java @@ -17,19 +17,21 @@ */ package org.apache.qpid.server.exchange; -import org.apache.qpid.server.queue.AMQQueue; +import org.apache.log4j.Logger; +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.BasicPublishBody; +import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.queue.AMQMessage; +import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.registry.ApplicationRegistry; -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.*; -import org.apache.log4j.Logger; -import javax.management.openmbean.*; -import javax.management.MBeanException; import javax.management.JMException; +import javax.management.MBeanException; +import javax.management.openmbean.*; +import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.ArrayList; public class DestNameExchange extends AbstractExchange { @@ -117,12 +119,14 @@ public class DestNameExchange extends AbstractExchange } public void createBinding(String queueName, String binding) - throws JMException + throws JMException { AMQQueue queue = ApplicationRegistry.getInstance().getQueueRegistry().getQueue(queueName); if (queue == null) + { throw new JMException("Queue \"" + queueName + "\" is not registered with the exchange."); + } try { @@ -147,7 +151,7 @@ public class DestNameExchange extends AbstractExchange { assert queue != null; assert routingKey != null; - if(!_index.add(routingKey, queue)) + if (!_index.add(routingKey, queue)) { _logger.debug("Queue " + queue + " is already registered with routing key " + routingKey); } @@ -195,7 +199,7 @@ public class DestNameExchange extends AbstractExchange _logger.debug("Publishing message to queue " + queues); } - for(AMQQueue q :queues) + for (AMQQueue q : queues) { q.deliver(payload); } diff --git a/java/broker/src/org/apache/qpid/server/queue/SubscriptionImpl.java b/java/broker/src/org/apache/qpid/server/queue/SubscriptionImpl.java index a3c2fab4f4..ef18f61070 100644 --- a/java/broker/src/org/apache/qpid/server/queue/SubscriptionImpl.java +++ b/java/broker/src/org/apache/qpid/server/queue/SubscriptionImpl.java @@ -19,12 +19,12 @@ package org.apache.qpid.server.queue; import org.apache.log4j.Logger; import org.apache.mina.common.ByteBuffer; +import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQDataBlock; import org.apache.qpid.framing.AMQFrame; import org.apache.qpid.framing.BasicDeliverBody; import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.protocol.AMQProtocolSession; -import org.apache.qpid.AMQException; /** * Encapsulation of a supscription to a queue. @@ -70,7 +70,8 @@ public class SubscriptionImpl implements Subscription throws AMQException { AMQChannel channel = protocolSession.getChannel(channelId); - if (channel == null) { + if (channel == null) + { throw new NullPointerException("channel not found in protocol session"); } @@ -99,8 +100,8 @@ public class SubscriptionImpl implements Subscription private boolean equals(SubscriptionImpl psc) { return sessionKey.equals(psc.sessionKey) - && psc.channel == channel - && psc.consumerTag.equals(consumerTag); + && psc.channel == channel + && psc.consumerTag.equals(consumerTag); } public int hashCode() @@ -113,18 +114,25 @@ public class SubscriptionImpl implements Subscription return "[channel=" + channel + ", consumerTag=" + consumerTag + ", session=" + protocolSession.getKey() + "]"; } + /** + * This method can be called by each of the publisher threads. + * As a result all changes to the channel object must be thread safe. + * + * @param msg + * @param queue + * @throws AMQException + */ public void send(AMQMessage msg, AMQQueue queue) throws AMQException { if (msg != null) { - final long deliveryTag = channel.getNextDeliveryTag(); + long deliveryTag = channel.prepareNewMessageForDelivery(_acks,msg,consumerTag,queue); + ByteBuffer deliver = createEncodedDeliverFrame(deliveryTag, msg.getRoutingKey(), msg.getExchangeName()); AMQDataBlock frame = msg.getDataBlock(deliver, channel.getChannelId()); - if (_acks) - { - channel.addUnacknowledgedMessage(msg, deliveryTag, consumerTag, queue); - } + protocolSession.writeFrame(frame); + // if we do not need to wait for client acknowledgements we can decrement // the reference count immediately if (!_acks) diff --git a/java/broker/src/org/apache/qpid/server/store/MemoryMessageStore.java b/java/broker/src/org/apache/qpid/server/store/MemoryMessageStore.java index baa414ff19..8dd268e673 100644 --- a/java/broker/src/org/apache/qpid/server/store/MemoryMessageStore.java +++ b/java/broker/src/org/apache/qpid/server/store/MemoryMessageStore.java @@ -17,21 +17,20 @@ */ package org.apache.qpid.server.store; +import org.apache.commons.configuration.Configuration; +import org.apache.log4j.Logger; +import org.apache.qpid.AMQException; import org.apache.qpid.server.queue.AMQMessage; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.QueueRegistry; -import org.apache.qpid.AMQException; -import org.apache.log4j.Logger; -import org.apache.commons.configuration.Configuration; -import java.util.concurrent.ConcurrentMap; +import java.util.List; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicLong; -import java.util.List; /** * A simple message store that stores the messages in a threadsafe structure in memory. - * */ public class MemoryMessageStore implements MessageStore { @@ -48,7 +47,7 @@ public class MemoryMessageStore implements MessageStore public void configure() { _log.info("Using capacity " + DEFAULT_HASHTABLE_CAPACITY + " for hash table"); - _messageMap = new ConcurrentHashMap<Long, AMQMessage>(DEFAULT_HASHTABLE_CAPACITY); + _messageMap = new ConcurrentHashMap<Long, AMQMessage>(DEFAULT_HASHTABLE_CAPACITY); } public void configure(String base, Configuration config) @@ -65,7 +64,7 @@ public class MemoryMessageStore implements MessageStore public void close() throws Exception { - if(_messageMap != null) + if (_messageMap != null) { _messageMap.clear(); _messageMap = null; |
