summaryrefslogtreecommitdiff
path: root/java/broker/src
diff options
context:
space:
mode:
Diffstat (limited to 'java/broker/src')
-rw-r--r--java/broker/src/org/apache/qpid/server/AMQChannel.java77
-rw-r--r--java/broker/src/org/apache/qpid/server/exchange/DestNameExchange.java24
-rw-r--r--java/broker/src/org/apache/qpid/server/queue/SubscriptionImpl.java26
-rw-r--r--java/broker/src/org/apache/qpid/server/store/MemoryMessageStore.java15
4 files changed, 90 insertions, 52 deletions
diff --git a/java/broker/src/org/apache/qpid/server/AMQChannel.java b/java/broker/src/org/apache/qpid/server/AMQChannel.java
index 8dc4626c46..d4226c42aa 100644
--- a/java/broker/src/org/apache/qpid/server/AMQChannel.java
+++ b/java/broker/src/org/apache/qpid/server/AMQChannel.java
@@ -23,26 +23,28 @@ import org.apache.qpid.framing.BasicPublishBody;
import org.apache.qpid.framing.ContentBody;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.server.exchange.MessageRouter;
+import org.apache.qpid.server.management.DefaultManagedObject;
+import org.apache.qpid.server.management.Managable;
+import org.apache.qpid.server.management.ManagedObject;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.txn.TxnBuffer;
import org.apache.qpid.server.txn.TxnOp;
-import org.apache.qpid.server.management.Managable;
-import org.apache.qpid.server.management.ManagedObject;
-import org.apache.qpid.server.management.DefaultManagedObject;
-import javax.management.ObjectName;
-import javax.management.MalformedObjectNameException;
import javax.management.JMException;
import javax.management.MBeanException;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
public class AMQChannel implements Managable
{
@@ -62,7 +64,7 @@ public class AMQChannel implements Managable
* The delivery tag is unique per channel. This is pre-incremented before putting into the deliver frame so that
* value of this represents the <b>last</b> tag sent out
*/
- private long _deliveryTag;
+ private AtomicLong _deliveryTag = new AtomicLong(0);
/**
* A channel has a default queue (the last declared) that is used when no queue name is
@@ -74,7 +76,7 @@ public class AMQChannel implements Managable
* This tag is unique per subscription to a queue. The server returns this in response to a
* basic.consume request.
*/
- private int _consumerTag = 0;
+ private int _consumerTag;
/**
* The current message - which may be partial in the sense that not all frames have been received yet -
@@ -150,7 +152,7 @@ public class AMQChannel implements Managable
_txnBuffer.commit();
}
}
- catch(AMQException ex)
+ catch (AMQException ex)
{
throw new MBeanException(ex, ex.toString());
}
@@ -160,13 +162,13 @@ public class AMQChannel implements Managable
{
if (_transactional)
{
- synchronized (_txnBuffer)
+ synchronized(_txnBuffer)
{
try
{
_txnBuffer.rollback();
}
- catch(AMQException ex)
+ catch (AMQException ex)
{
throw new MBeanException(ex, ex.toString());
}
@@ -201,7 +203,7 @@ public class AMQChannel implements Managable
}
public AMQChannel(int channelId, MessageStore messageStore, MessageRouter exchanges)
- throws AMQException
+ throws AMQException
{
_channelId = channelId;
_channelName = _channelId + "-" + this.hashCode();
@@ -300,7 +302,7 @@ public class AMQChannel implements Managable
public long getNextDeliveryTag()
{
- return ++_deliveryTag;
+ return _deliveryTag.incrementAndGet();
}
public int getNextConsumerTag()
@@ -348,7 +350,7 @@ public class AMQChannel implements Managable
else
{
throw new AMQException(_log, "Consumer tag " + consumerTag + " not known to channel " +
- _channelId);
+ _channelId);
}
}
@@ -361,7 +363,7 @@ public class AMQChannel implements Managable
{
if (_transactional)
{
- synchronized (_txnBuffer)
+ synchronized(_txnBuffer)
{
_txnBuffer.rollback();//releases messages
}
@@ -390,7 +392,7 @@ public class AMQChannel implements Managable
*/
public void addUnacknowledgedMessage(AMQMessage message, long deliveryTag, String consumerTag, AMQQueue queue)
{
- synchronized (_unacknowledgedMessageMapLock)
+ synchronized(_unacknowledgedMessageMapLock)
{
_unacknowledgedMessageMap.put(deliveryTag, new UnacknowledgedMessage(queue, message, consumerTag));
checkSuspension();
@@ -405,7 +407,7 @@ public class AMQChannel implements Managable
{
// we must create a new map since all the messages will get a new delivery tag when they are redelivered
Map<Long, UnacknowledgedMessage> currentList;
- synchronized (_unacknowledgedMessageMapLock)
+ synchronized(_unacknowledgedMessageMapLock)
{
currentList = _unacknowledgedMessageMap;
_unacknowledgedMessageMap = new LinkedHashMap<Long, UnacknowledgedMessage>(DEFAULT_PREFETCH);
@@ -426,7 +428,7 @@ public class AMQChannel implements Managable
public void resend(AMQProtocolSession session)
{
//messages go to this channel
- synchronized (_unacknowledgedMessageMapLock)
+ synchronized(_unacknowledgedMessageMapLock)
{
for (Map.Entry<Long, UnacknowledgedMessage> entry : _unacknowledgedMessageMap.entrySet())
{
@@ -449,7 +451,7 @@ public class AMQChannel implements Managable
*/
public void queueDeleted(AMQQueue queue)
{
- synchronized (_unacknowledgedMessageMapLock)
+ synchronized(_unacknowledgedMessageMapLock)
{
for (Map.Entry<Long, UnacknowledgedMessage> unacked : _unacknowledgedMessageMap.entrySet())
{
@@ -465,13 +467,25 @@ public class AMQChannel implements Managable
catch (AMQException e)
{
_log.error("Error decrementing ref count on message " + unackedMsg.message.getMessageId() + ": " +
- e, e);
+ e, e);
}
}
}
}
}
+ public synchronized long prepareNewMessageForDelivery(boolean acks, AMQMessage msg, String consumerTag, AMQQueue queue)
+ {
+ long deliveryTag = getNextDeliveryTag();
+
+ if (acks)
+ {
+ addUnacknowledgedMessage(msg, deliveryTag, consumerTag, queue);
+ }
+
+ return deliveryTag;
+ }
+
/**
* Acknowledge one or more messages.
*
@@ -498,7 +512,7 @@ public class AMQChannel implements Managable
if (multiple)
{
LinkedList<UnacknowledgedMessage> acked = new LinkedList<UnacknowledgedMessage>();
- synchronized (_unacknowledgedMessageMapLock)
+ synchronized(_unacknowledgedMessageMapLock)
{
if (deliveryTag == 0)
{
@@ -514,10 +528,20 @@ public class AMQChannel implements Managable
throw new AMQException("Multiple ack on delivery tag " + deliveryTag + " not known for channel");
}
Iterator<Map.Entry<Long, UnacknowledgedMessage>> i = _unacknowledgedMessageMap.entrySet().iterator();
+
while (i.hasNext())
{
+
Map.Entry<Long, UnacknowledgedMessage> unacked = i.next();
+
+ if (unacked.getKey() > deliveryTag)
+ {
+ //This should not occur now.
+ throw new AMQException("UnacknowledgedMessageMap is out of order:" + unacked.getKey() + " When deliveryTag is:" + deliveryTag + "ES:" + _unacknowledgedMessageMap.entrySet().toString());
+ }
+
i.remove();
+
acked.add(unacked.getValue());
if (unacked.getKey() == deliveryTag)
{
@@ -525,11 +549,12 @@ public class AMQChannel implements Managable
}
}
}
- }
+ }// synchronized
+
if (_log.isDebugEnabled())
{
_log.debug("Received multiple ack for delivery tag " + deliveryTag + ". Removing " +
- acked.size() + " items.");
+ acked.size() + " items.");
}
for (UnacknowledgedMessage msg : acked)
@@ -541,12 +566,14 @@ public class AMQChannel implements Managable
else
{
UnacknowledgedMessage msg;
- synchronized (_unacknowledgedMessageMapLock)
+ synchronized(_unacknowledgedMessageMapLock)
{
msg = _unacknowledgedMessageMap.remove(deliveryTag);
}
+
if (msg == null)
{
+ _log.info("Single ack on delivery tag " + deliveryTag + " not known for channel:" + _channelId);
throw new AMQException("Single ack on delivery tag " + deliveryTag + " not known for channel:" + _channelId);
}
msg.discard();
@@ -573,7 +600,7 @@ public class AMQChannel implements Managable
{
boolean suspend;
//noinspection SynchronizeOnNonFinalField
- synchronized (_unacknowledgedMessageMapLock)
+ synchronized(_unacknowledgedMessageMapLock)
{
suspend = _unacknowledgedMessageMap.size() >= _prefetchCount;
}
@@ -614,7 +641,7 @@ public class AMQChannel implements Managable
public void rollback() throws AMQException
{
//need to protect rollback and close from each other...
- synchronized (_txnBuffer)
+ synchronized(_txnBuffer)
{
_txnBuffer.rollback();
}
diff --git a/java/broker/src/org/apache/qpid/server/exchange/DestNameExchange.java b/java/broker/src/org/apache/qpid/server/exchange/DestNameExchange.java
index 7f1c7df224..a703595cc4 100644
--- a/java/broker/src/org/apache/qpid/server/exchange/DestNameExchange.java
+++ b/java/broker/src/org/apache/qpid/server/exchange/DestNameExchange.java
@@ -17,19 +17,21 @@
*/
package org.apache.qpid.server.exchange;
-import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.BasicPublishBody;
+import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.*;
-import org.apache.log4j.Logger;
-import javax.management.openmbean.*;
-import javax.management.MBeanException;
import javax.management.JMException;
+import javax.management.MBeanException;
+import javax.management.openmbean.*;
+import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
-import java.util.ArrayList;
public class DestNameExchange extends AbstractExchange
{
@@ -117,12 +119,14 @@ public class DestNameExchange extends AbstractExchange
}
public void createBinding(String queueName, String binding)
- throws JMException
+ throws JMException
{
AMQQueue queue = ApplicationRegistry.getInstance().getQueueRegistry().getQueue(queueName);
if (queue == null)
+ {
throw new JMException("Queue \"" + queueName + "\" is not registered with the exchange.");
+ }
try
{
@@ -147,7 +151,7 @@ public class DestNameExchange extends AbstractExchange
{
assert queue != null;
assert routingKey != null;
- if(!_index.add(routingKey, queue))
+ if (!_index.add(routingKey, queue))
{
_logger.debug("Queue " + queue + " is already registered with routing key " + routingKey);
}
@@ -195,7 +199,7 @@ public class DestNameExchange extends AbstractExchange
_logger.debug("Publishing message to queue " + queues);
}
- for(AMQQueue q :queues)
+ for (AMQQueue q : queues)
{
q.deliver(payload);
}
diff --git a/java/broker/src/org/apache/qpid/server/queue/SubscriptionImpl.java b/java/broker/src/org/apache/qpid/server/queue/SubscriptionImpl.java
index a3c2fab4f4..ef18f61070 100644
--- a/java/broker/src/org/apache/qpid/server/queue/SubscriptionImpl.java
+++ b/java/broker/src/org/apache/qpid/server/queue/SubscriptionImpl.java
@@ -19,12 +19,12 @@ package org.apache.qpid.server.queue;
import org.apache.log4j.Logger;
import org.apache.mina.common.ByteBuffer;
+import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQDataBlock;
import org.apache.qpid.framing.AMQFrame;
import org.apache.qpid.framing.BasicDeliverBody;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.AMQException;
/**
* Encapsulation of a supscription to a queue.
@@ -70,7 +70,8 @@ public class SubscriptionImpl implements Subscription
throws AMQException
{
AMQChannel channel = protocolSession.getChannel(channelId);
- if (channel == null) {
+ if (channel == null)
+ {
throw new NullPointerException("channel not found in protocol session");
}
@@ -99,8 +100,8 @@ public class SubscriptionImpl implements Subscription
private boolean equals(SubscriptionImpl psc)
{
return sessionKey.equals(psc.sessionKey)
- && psc.channel == channel
- && psc.consumerTag.equals(consumerTag);
+ && psc.channel == channel
+ && psc.consumerTag.equals(consumerTag);
}
public int hashCode()
@@ -113,18 +114,25 @@ public class SubscriptionImpl implements Subscription
return "[channel=" + channel + ", consumerTag=" + consumerTag + ", session=" + protocolSession.getKey() + "]";
}
+ /**
+ * This method can be called by each of the publisher threads.
+ * As a result all changes to the channel object must be thread safe.
+ *
+ * @param msg
+ * @param queue
+ * @throws AMQException
+ */
public void send(AMQMessage msg, AMQQueue queue) throws AMQException
{
if (msg != null)
{
- final long deliveryTag = channel.getNextDeliveryTag();
+ long deliveryTag = channel.prepareNewMessageForDelivery(_acks,msg,consumerTag,queue);
+
ByteBuffer deliver = createEncodedDeliverFrame(deliveryTag, msg.getRoutingKey(), msg.getExchangeName());
AMQDataBlock frame = msg.getDataBlock(deliver, channel.getChannelId());
- if (_acks)
- {
- channel.addUnacknowledgedMessage(msg, deliveryTag, consumerTag, queue);
- }
+
protocolSession.writeFrame(frame);
+
// if we do not need to wait for client acknowledgements we can decrement
// the reference count immediately
if (!_acks)
diff --git a/java/broker/src/org/apache/qpid/server/store/MemoryMessageStore.java b/java/broker/src/org/apache/qpid/server/store/MemoryMessageStore.java
index baa414ff19..8dd268e673 100644
--- a/java/broker/src/org/apache/qpid/server/store/MemoryMessageStore.java
+++ b/java/broker/src/org/apache/qpid/server/store/MemoryMessageStore.java
@@ -17,21 +17,20 @@
*/
package org.apache.qpid.server.store;
+import org.apache.commons.configuration.Configuration;
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.QueueRegistry;
-import org.apache.qpid.AMQException;
-import org.apache.log4j.Logger;
-import org.apache.commons.configuration.Configuration;
-import java.util.concurrent.ConcurrentMap;
+import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;
-import java.util.List;
/**
* A simple message store that stores the messages in a threadsafe structure in memory.
- *
*/
public class MemoryMessageStore implements MessageStore
{
@@ -48,7 +47,7 @@ public class MemoryMessageStore implements MessageStore
public void configure()
{
_log.info("Using capacity " + DEFAULT_HASHTABLE_CAPACITY + " for hash table");
- _messageMap = new ConcurrentHashMap<Long, AMQMessage>(DEFAULT_HASHTABLE_CAPACITY);
+ _messageMap = new ConcurrentHashMap<Long, AMQMessage>(DEFAULT_HASHTABLE_CAPACITY);
}
public void configure(String base, Configuration config)
@@ -65,7 +64,7 @@ public class MemoryMessageStore implements MessageStore
public void close() throws Exception
{
- if(_messageMap != null)
+ if (_messageMap != null)
{
_messageMap.clear();
_messageMap = null;