diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2012-01-02 10:01:21 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2012-01-02 10:01:21 +0000 |
| commit | 2c10159e28ff85e52840d5c6964123e4c410458d (patch) | |
| tree | 8cb649778976996aee1f21963a6c5c1bad981bcc /java/broker | |
| parent | 6d4226a532443ab1fe33c7d486877dbb11e154de (diff) | |
| download | qpid-python-2c10159e28ff85e52840d5c6964123e4c410458d.tar.gz | |
QPID-3713 : Implement producer side flow control for 0-10 in Java Broker
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1226382 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker')
6 files changed, 115 insertions, 16 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ChannelLogSubject.java b/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ChannelLogSubject.java index 9b357403a8..8266c1e79f 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ChannelLogSubject.java +++ b/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ChannelLogSubject.java @@ -22,6 +22,9 @@ package org.apache.qpid.server.logging.subjects; import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.transport.ServerConnection; +import org.apache.qpid.server.transport.ServerSession; + import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CHANNEL_FORMAT; public class ChannelLogSubject extends AbstractLogSubject @@ -52,5 +55,30 @@ public class ChannelLogSubject extends AbstractLogSubject session.getVirtualHost().getName(), channel.getChannelId()); } - + + public ChannelLogSubject(ServerSession session) + { + /** + * LOG FORMAT used by the AMQPConnectorActor follows + * ChannelLogSubject.CHANNEL_FORMAT : + * con:{0}({1}@{2}/{3})/ch:{4} + * + * Uses a MessageFormat call to insert the required values according to + * these indices: + * + * 0 - Connection ID + * 1 - User ID + * 2 - IP + * 3 - Virtualhost + * 4 - Channel ID + */ + ServerConnection connection = (ServerConnection) session.getConnection(); + setLogStringWithFormat(CHANNEL_FORMAT, + connection == null ? -1L : connection.getConnectionId(), + session.getAuthorizedPrincipal() == null ? "?" : session.getAuthorizedPrincipal().getName(), + (connection == null || connection.getConfig() == null) ? "?" : connection.getConfig().getAddress(), + session.getVirtualHost().getName(), + session.getChannel()); + } + } diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java index bc63403a86..c55fe321fc 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java @@ -22,6 +22,8 @@ package org.apache.qpid.server.protocol; import org.apache.qpid.AMQException; import org.apache.qpid.server.logging.LogSubject; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.SimpleAMQQueue; public interface AMQSessionModel { @@ -51,4 +53,8 @@ public interface AMQSessionModel * @param idleClose time in milliseconds before closing connection with idle transaction */ public void checkTransactionStatus(long openWarn, long openClose, long idleWarn, long idleClose) throws AMQException; + + void block(AMQQueue queue); + + void unblock(AMQQueue queue); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java index 6dfdc5e8b4..a1f1c037ec 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java @@ -217,7 +217,7 @@ public interface AMQQueue extends Managable, Comparable<AMQQueue>, ExchangeRefer Map<String, Object> getArguments(); - void checkCapacity(AMQChannel channel); + void checkCapacity(AMQSessionModel channel); /** * ExistingExclusiveSubscription signals a failure to create a subscription, because an exclusive subscription diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java index 25fc91b998..ebed781a1a 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java @@ -164,7 +164,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener private AtomicInteger _deliveredMessages = new AtomicInteger(); private AtomicBoolean _stopped = new AtomicBoolean(false); - private final ConcurrentMap<AMQChannel, Boolean> _blockedChannels = new ConcurrentHashMap<AMQChannel, Boolean>(); + private final ConcurrentMap<AMQSessionModel, Boolean> _blockedChannels = new ConcurrentHashMap<AMQSessionModel, Boolean>(); private final AtomicBoolean _deleted = new AtomicBoolean(false); private final List<Task> _deleteTaskList = new CopyOnWriteArrayList<Task>(); @@ -1528,7 +1528,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener } } - public void checkCapacity(AMQChannel channel) + public void checkCapacity(AMQSessionModel channel) { if(_capacity != 0l) { @@ -1538,10 +1538,9 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener //Overfull log message _logActor.message(_logSubject, QueueMessages.OVERFULL(_atomicQueueSize.get(), _capacity)); - if(_blockedChannels.putIfAbsent(channel, Boolean.TRUE)==null) - { - channel.block(this); - } + _blockedChannels.putIfAbsent(channel, Boolean.TRUE); + + channel.block(this); if(_atomicQueueSize.get() <= _flowResumeCapacity) { @@ -1573,7 +1572,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener } - for(AMQChannel c : _blockedChannels.keySet()) + for(AMQSessionModel c : _blockedChannels.keySet()) { c.unblock(this); _blockedChannels.remove(c); diff --git a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java index 8e6d33d3bc..7526b19058 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java +++ b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java @@ -25,7 +25,6 @@ import static org.apache.qpid.util.Serial.gt; import java.security.Principal; import java.text.MessageFormat; -import java.util.ArrayList; import java.util.Collection; import java.util.Iterator; import java.util.List; @@ -33,8 +32,11 @@ import java.util.Map; import java.util.SortedMap; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import javax.security.auth.Subject; import org.apache.qpid.AMQException; @@ -45,11 +47,13 @@ import org.apache.qpid.server.configuration.ConfiguredObject; import org.apache.qpid.server.configuration.ConnectionConfig; import org.apache.qpid.server.configuration.SessionConfig; import org.apache.qpid.server.configuration.SessionConfigType; +import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.logging.LogActor; import org.apache.qpid.server.logging.LogSubject; import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.logging.actors.GenericActor; import org.apache.qpid.server.logging.messages.ChannelMessages; +import org.apache.qpid.server.logging.subjects.ChannelLogSubject; import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.protocol.AMQConnectionModel; @@ -66,6 +70,11 @@ import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.transport.Binary; import org.apache.qpid.transport.Connection; +import org.apache.qpid.transport.MessageCreditUnit; +import org.apache.qpid.transport.MessageFlow; +import org.apache.qpid.transport.MessageFlowMode; +import org.apache.qpid.transport.MessageSetFlowMode; +import org.apache.qpid.transport.MessageStop; import org.apache.qpid.transport.MessageTransfer; import org.apache.qpid.transport.Method; import org.apache.qpid.transport.Range; @@ -81,6 +90,7 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi private static final Logger _logger = LoggerFactory.getLogger(ServerSession.class); private static final String NULL_DESTINTATION = UUID.randomUUID().toString(); + private static final int HALF_INCOMING_CREDIT_THRESHOLD = 1 << 30; private final UUID _id; private ConnectionConfig _connectionConfig; @@ -88,6 +98,16 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi private LogActor _actor = GenericActor.getInstance(this); private PostEnqueueAction _postEnqueueAction = new PostEnqueueAction(); + private final ConcurrentMap<AMQQueue, Boolean> _blockingQueues = new ConcurrentHashMap<AMQQueue, Boolean>(); + + private final ConcurrentMap<Exchange, Boolean> _blockingExchanges = new ConcurrentHashMap<Exchange, Boolean>(); + + + private final AtomicBoolean _blocking = new AtomicBoolean(false); + private ChannelLogSubject _logSubject; + private final AtomicInteger _oustandingCredit = new AtomicInteger(Integer.MAX_VALUE); + + public static interface MessageDispositionChangeListener { public void onAccept(); @@ -132,7 +152,7 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi super(connection, delegate, name, expiry); _connectionConfig = connConfig; _transaction = new AutoCommitTransaction(this.getMessageStore()); - + _logSubject = new ChannelLogSubject(this); _id = getConfigStore().createId(); getConfigStore().addConfiguredObject(this); } @@ -161,6 +181,10 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi public void enqueue(final ServerMessage message, final List<? extends BaseQueue> queues) { + if(_oustandingCredit.decrementAndGet() < HALF_INCOMING_CREDIT_THRESHOLD) + { + invoke(new MessageFlow("",MessageCreditUnit.MESSAGE,HALF_INCOMING_CREDIT_THRESHOLD)); + } getConnectionModel().registerMessageReceived(message.getSize(), message.getArrivalTime()); PostEnqueueAction postTransactionAction; if(isTransactional()) @@ -661,6 +685,43 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi } } + public void block(AMQQueue queue) + { + if(_blockingQueues.putIfAbsent(queue, Boolean.TRUE) == null) + { + + if(_blocking.compareAndSet(false,true)) + { + invoke(new MessageSetFlowMode("", MessageFlowMode.CREDIT)); + invoke(new MessageStop("")); + _actor.message(_logSubject, ChannelMessages.FLOW_ENFORCED(queue.getNameShortString().toString())); + } + + + } + } + + public void unblock(AMQQueue queue) + { + if(_blockingQueues.remove(queue) && _blockingQueues.isEmpty()) + { + if(_blocking.compareAndSet(true,false)) + { + + _actor.message(_logSubject, ChannelMessages.FLOW_REMOVED()); + MessageFlow mf = new MessageFlow(); + mf.setUnit(MessageCreditUnit.MESSAGE); + mf.setDestination(""); + _oustandingCredit.set(Integer.MAX_VALUE); + mf.setValue(Integer.MAX_VALUE); + invoke(mf); + + + } + } + } + + public String toLogString() { return "[" + @@ -701,7 +762,7 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi } } - private static class PostEnqueueAction implements ServerTransaction.Action + private class PostEnqueueAction implements ServerTransaction.Action { private List<? extends BaseQueue> _queues; @@ -732,7 +793,13 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi { try { - _queues.get(i).enqueue(_message, _transactional, null); + BaseQueue queue = _queues.get(i); + queue.enqueue(_message, _transactional, null); + if(queue instanceof AMQQueue) + { + ((AMQQueue)queue).checkCapacity(ServerSession.this); + } + } catch (AMQException e) { @@ -756,6 +823,6 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi public boolean getBlocking() { - return false; //TODO: Blocking not implemented on 0-10 yet. + return _blocking.get(); } } diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java b/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java index f43af447ff..8b029f9a51 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java @@ -30,7 +30,6 @@ import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.management.ManagedObject; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.security.AuthorizationHolder; -import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.binding.Binding; import org.apache.qpid.server.txn.ServerTransaction; @@ -477,7 +476,7 @@ public class MockAMQQueue implements AMQQueue return null; } - public void checkCapacity(AMQChannel channel) + public void checkCapacity(AMQSessionModel channel) { } |
