summaryrefslogtreecommitdiff
path: root/java/broker
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2012-01-02 10:01:21 +0000
committerRobert Godfrey <rgodfrey@apache.org>2012-01-02 10:01:21 +0000
commit2c10159e28ff85e52840d5c6964123e4c410458d (patch)
tree8cb649778976996aee1f21963a6c5c1bad981bcc /java/broker
parent6d4226a532443ab1fe33c7d486877dbb11e154de (diff)
downloadqpid-python-2c10159e28ff85e52840d5c6964123e4c410458d.tar.gz
QPID-3713 : Implement producer side flow control for 0-10 in Java Broker
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1226382 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ChannelLogSubject.java30
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java6
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java13
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java77
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java3
6 files changed, 115 insertions, 16 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ChannelLogSubject.java b/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ChannelLogSubject.java
index 9b357403a8..8266c1e79f 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ChannelLogSubject.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ChannelLogSubject.java
@@ -22,6 +22,9 @@ package org.apache.qpid.server.logging.subjects;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.transport.ServerConnection;
+import org.apache.qpid.server.transport.ServerSession;
+
import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CHANNEL_FORMAT;
public class ChannelLogSubject extends AbstractLogSubject
@@ -52,5 +55,30 @@ public class ChannelLogSubject extends AbstractLogSubject
session.getVirtualHost().getName(),
channel.getChannelId());
}
-
+
+ public ChannelLogSubject(ServerSession session)
+ {
+ /**
+ * LOG FORMAT used by the AMQPConnectorActor follows
+ * ChannelLogSubject.CHANNEL_FORMAT :
+ * con:{0}({1}@{2}/{3})/ch:{4}
+ *
+ * Uses a MessageFormat call to insert the required values according to
+ * these indices:
+ *
+ * 0 - Connection ID
+ * 1 - User ID
+ * 2 - IP
+ * 3 - Virtualhost
+ * 4 - Channel ID
+ */
+ ServerConnection connection = (ServerConnection) session.getConnection();
+ setLogStringWithFormat(CHANNEL_FORMAT,
+ connection == null ? -1L : connection.getConnectionId(),
+ session.getAuthorizedPrincipal() == null ? "?" : session.getAuthorizedPrincipal().getName(),
+ (connection == null || connection.getConfig() == null) ? "?" : connection.getConfig().getAddress(),
+ session.getVirtualHost().getName(),
+ session.getChannel());
+ }
+
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java
index bc63403a86..c55fe321fc 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java
@@ -22,6 +22,8 @@ package org.apache.qpid.server.protocol;
import org.apache.qpid.AMQException;
import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.SimpleAMQQueue;
public interface AMQSessionModel
{
@@ -51,4 +53,8 @@ public interface AMQSessionModel
* @param idleClose time in milliseconds before closing connection with idle transaction
*/
public void checkTransactionStatus(long openWarn, long openClose, long idleWarn, long idleClose) throws AMQException;
+
+ void block(AMQQueue queue);
+
+ void unblock(AMQQueue queue);
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
index 6dfdc5e8b4..a1f1c037ec 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
@@ -217,7 +217,7 @@ public interface AMQQueue extends Managable, Comparable<AMQQueue>, ExchangeRefer
Map<String, Object> getArguments();
- void checkCapacity(AMQChannel channel);
+ void checkCapacity(AMQSessionModel channel);
/**
* ExistingExclusiveSubscription signals a failure to create a subscription, because an exclusive subscription
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
index 25fc91b998..ebed781a1a 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
@@ -164,7 +164,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
private AtomicInteger _deliveredMessages = new AtomicInteger();
private AtomicBoolean _stopped = new AtomicBoolean(false);
- private final ConcurrentMap<AMQChannel, Boolean> _blockedChannels = new ConcurrentHashMap<AMQChannel, Boolean>();
+ private final ConcurrentMap<AMQSessionModel, Boolean> _blockedChannels = new ConcurrentHashMap<AMQSessionModel, Boolean>();
private final AtomicBoolean _deleted = new AtomicBoolean(false);
private final List<Task> _deleteTaskList = new CopyOnWriteArrayList<Task>();
@@ -1528,7 +1528,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
}
}
- public void checkCapacity(AMQChannel channel)
+ public void checkCapacity(AMQSessionModel channel)
{
if(_capacity != 0l)
{
@@ -1538,10 +1538,9 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
//Overfull log message
_logActor.message(_logSubject, QueueMessages.OVERFULL(_atomicQueueSize.get(), _capacity));
- if(_blockedChannels.putIfAbsent(channel, Boolean.TRUE)==null)
- {
- channel.block(this);
- }
+ _blockedChannels.putIfAbsent(channel, Boolean.TRUE);
+
+ channel.block(this);
if(_atomicQueueSize.get() <= _flowResumeCapacity)
{
@@ -1573,7 +1572,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
}
- for(AMQChannel c : _blockedChannels.keySet())
+ for(AMQSessionModel c : _blockedChannels.keySet())
{
c.unblock(this);
_blockedChannels.remove(c);
diff --git a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
index 8e6d33d3bc..7526b19058 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
@@ -25,7 +25,6 @@ import static org.apache.qpid.util.Serial.gt;
import java.security.Principal;
import java.text.MessageFormat;
-import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
@@ -33,8 +32,11 @@ import java.util.Map;
import java.util.SortedMap;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import javax.security.auth.Subject;
import org.apache.qpid.AMQException;
@@ -45,11 +47,13 @@ import org.apache.qpid.server.configuration.ConfiguredObject;
import org.apache.qpid.server.configuration.ConnectionConfig;
import org.apache.qpid.server.configuration.SessionConfig;
import org.apache.qpid.server.configuration.SessionConfigType;
+import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.logging.LogActor;
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.actors.GenericActor;
import org.apache.qpid.server.logging.messages.ChannelMessages;
+import org.apache.qpid.server.logging.subjects.ChannelLogSubject;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.protocol.AMQConnectionModel;
@@ -66,6 +70,11 @@ import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.transport.Binary;
import org.apache.qpid.transport.Connection;
+import org.apache.qpid.transport.MessageCreditUnit;
+import org.apache.qpid.transport.MessageFlow;
+import org.apache.qpid.transport.MessageFlowMode;
+import org.apache.qpid.transport.MessageSetFlowMode;
+import org.apache.qpid.transport.MessageStop;
import org.apache.qpid.transport.MessageTransfer;
import org.apache.qpid.transport.Method;
import org.apache.qpid.transport.Range;
@@ -81,6 +90,7 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi
private static final Logger _logger = LoggerFactory.getLogger(ServerSession.class);
private static final String NULL_DESTINTATION = UUID.randomUUID().toString();
+ private static final int HALF_INCOMING_CREDIT_THRESHOLD = 1 << 30;
private final UUID _id;
private ConnectionConfig _connectionConfig;
@@ -88,6 +98,16 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi
private LogActor _actor = GenericActor.getInstance(this);
private PostEnqueueAction _postEnqueueAction = new PostEnqueueAction();
+ private final ConcurrentMap<AMQQueue, Boolean> _blockingQueues = new ConcurrentHashMap<AMQQueue, Boolean>();
+
+ private final ConcurrentMap<Exchange, Boolean> _blockingExchanges = new ConcurrentHashMap<Exchange, Boolean>();
+
+
+ private final AtomicBoolean _blocking = new AtomicBoolean(false);
+ private ChannelLogSubject _logSubject;
+ private final AtomicInteger _oustandingCredit = new AtomicInteger(Integer.MAX_VALUE);
+
+
public static interface MessageDispositionChangeListener
{
public void onAccept();
@@ -132,7 +152,7 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi
super(connection, delegate, name, expiry);
_connectionConfig = connConfig;
_transaction = new AutoCommitTransaction(this.getMessageStore());
-
+ _logSubject = new ChannelLogSubject(this);
_id = getConfigStore().createId();
getConfigStore().addConfiguredObject(this);
}
@@ -161,6 +181,10 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi
public void enqueue(final ServerMessage message, final List<? extends BaseQueue> queues)
{
+ if(_oustandingCredit.decrementAndGet() < HALF_INCOMING_CREDIT_THRESHOLD)
+ {
+ invoke(new MessageFlow("",MessageCreditUnit.MESSAGE,HALF_INCOMING_CREDIT_THRESHOLD));
+ }
getConnectionModel().registerMessageReceived(message.getSize(), message.getArrivalTime());
PostEnqueueAction postTransactionAction;
if(isTransactional())
@@ -661,6 +685,43 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi
}
}
+ public void block(AMQQueue queue)
+ {
+ if(_blockingQueues.putIfAbsent(queue, Boolean.TRUE) == null)
+ {
+
+ if(_blocking.compareAndSet(false,true))
+ {
+ invoke(new MessageSetFlowMode("", MessageFlowMode.CREDIT));
+ invoke(new MessageStop(""));
+ _actor.message(_logSubject, ChannelMessages.FLOW_ENFORCED(queue.getNameShortString().toString()));
+ }
+
+
+ }
+ }
+
+ public void unblock(AMQQueue queue)
+ {
+ if(_blockingQueues.remove(queue) && _blockingQueues.isEmpty())
+ {
+ if(_blocking.compareAndSet(true,false))
+ {
+
+ _actor.message(_logSubject, ChannelMessages.FLOW_REMOVED());
+ MessageFlow mf = new MessageFlow();
+ mf.setUnit(MessageCreditUnit.MESSAGE);
+ mf.setDestination("");
+ _oustandingCredit.set(Integer.MAX_VALUE);
+ mf.setValue(Integer.MAX_VALUE);
+ invoke(mf);
+
+
+ }
+ }
+ }
+
+
public String toLogString()
{
return "[" +
@@ -701,7 +762,7 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi
}
}
- private static class PostEnqueueAction implements ServerTransaction.Action
+ private class PostEnqueueAction implements ServerTransaction.Action
{
private List<? extends BaseQueue> _queues;
@@ -732,7 +793,13 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi
{
try
{
- _queues.get(i).enqueue(_message, _transactional, null);
+ BaseQueue queue = _queues.get(i);
+ queue.enqueue(_message, _transactional, null);
+ if(queue instanceof AMQQueue)
+ {
+ ((AMQQueue)queue).checkCapacity(ServerSession.this);
+ }
+
}
catch (AMQException e)
{
@@ -756,6 +823,6 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi
public boolean getBlocking()
{
- return false; //TODO: Blocking not implemented on 0-10 yet.
+ return _blocking.get();
}
}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java b/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
index f43af447ff..8b029f9a51 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
@@ -30,7 +30,6 @@ import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.management.ManagedObject;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.security.AuthorizationHolder;
-import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.binding.Binding;
import org.apache.qpid.server.txn.ServerTransaction;
@@ -477,7 +476,7 @@ public class MockAMQQueue implements AMQQueue
return null;
}
- public void checkCapacity(AMQChannel channel)
+ public void checkCapacity(AMQSessionModel channel)
{
}