diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2009-10-01 18:09:10 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2009-10-01 18:09:10 +0000 |
| commit | c62dbd2ed611515bf232b52455c785b1c87861f1 (patch) | |
| tree | 28e3a3816ecc18817e9000c1f7971d381e5d6c32 /qpid/java | |
| parent | f69abcd4b321f8e2d61cc1fbeb8cb20cb99ed1eb (diff) | |
| download | qpid-python-c62dbd2ed611515bf232b52455c785b1c87861f1.tar.gz | |
QPID-942 : Add Simplistic Producer Flow Control to the java Broker / java 0-8/0-9 client
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@820739 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
16 files changed, 695 insertions, 29 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index 180f0a992c..ea48bd7cc3 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -27,13 +27,12 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ConcurrentHashMap; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.ContentBody; -import org.apache.qpid.framing.ContentHeaderBody; -import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.framing.*; import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.server.ack.UnacknowledgedMessageMap; import org.apache.qpid.server.ack.UnacknowledgedMessageMapImpl; @@ -42,12 +41,8 @@ import org.apache.qpid.server.exchange.NoRouteException; import org.apache.qpid.server.flow.FlowCreditManager; import org.apache.qpid.server.flow.Pre0_10CreditManager; import org.apache.qpid.server.protocol.AMQProtocolSession; -import org.apache.qpid.server.queue.AMQMessage; -import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.queue.IncomingMessage; -import org.apache.qpid.server.queue.MessageHandleFactory; -import org.apache.qpid.server.queue.QueueEntry; -import org.apache.qpid.server.queue.UnauthorizedAccessException; +import org.apache.qpid.server.protocol.AMQMinaProtocolSession; +import org.apache.qpid.server.queue.*; import org.apache.qpid.server.subscription.Subscription; import org.apache.qpid.server.subscription.SubscriptionFactoryImpl; import org.apache.qpid.server.subscription.ClientDeliveryMethod; @@ -59,6 +54,7 @@ import org.apache.qpid.server.txn.NonTransactionalContext; import org.apache.qpid.server.txn.TransactionalContext; import org.apache.qpid.server.logging.LogActor; import org.apache.qpid.server.logging.LogSubject; +import org.apache.qpid.server.logging.LogMessage; import org.apache.qpid.server.logging.messages.ChannelMessages; import org.apache.qpid.server.logging.subjects.ChannelLogSubject; import org.apache.qpid.server.logging.actors.AMQPChannelActor; @@ -119,6 +115,11 @@ public class AMQChannel private final AMQProtocolSession _session; private boolean _closing; + private final ConcurrentMap<AMQQueue, Boolean> _blockingQueues = new ConcurrentHashMap<AMQQueue, Boolean>(); + + private final AtomicBoolean _blocking = new AtomicBoolean(false); + + private LogActor _actor; private LogSubject _logSubject; @@ -798,6 +799,7 @@ public class AMQChannel _actor.message(_logSubject, ChannelMessages.CHN_1002("Started")); } + // This section takes two different approaches to perform to perform // the same function. Ensuring that the Subscription has taken note // of the change in Channel State @@ -978,4 +980,37 @@ public class AMQChannel { return _actor; } + + public void block(AMQQueue queue) + { + if(_blockingQueues.putIfAbsent(queue, Boolean.TRUE) == null) + { + + if(_blocking.compareAndSet(false,true)) + { + _actor.message(_logSubject, ChannelMessages.CHN_1005(queue.getName().toString())); + flow(false); + } + } + } + + public void unblock(AMQQueue queue) + { + if(_blockingQueues.remove(queue)) + { + if(_blocking.compareAndSet(true,false)) + { + _actor.message(_logSubject, ChannelMessages.CHN_1006()); + + flow(true); + } + } + } + + private void flow(boolean flow) + { + MethodRegistry methodRegistry = _session.getMethodRegistry(); + AMQMethodBody responseBody = methodRegistry.createChannelFlowBody(flow); + _session.writeFrame(responseBody.generateFrame(_channelId)); + } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java index 74bb7ee969..5c73e353de 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java @@ -108,4 +108,14 @@ public class QueueConfiguration return _config.getLong("minimumAlertRepeatGap", _vHostConfig.getMinimumAlertRepeatGap()); } + public long getCapacity() + { + return _config.getLong("capacity", _vHostConfig.getCapacity()); + } + + public long getFlowResumeCapacity() + { + return _config.getLong("flowResumeCapacity", _vHostConfig.getFlowResumeCapacity()); + } + } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java index ed9d8acc08..01befbbfe8 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java @@ -103,6 +103,8 @@ public class ServerConfiguration implements SignalHandler envVarMap.put("QPID_MAXIMUMQUEUEDEPTH", "maximumQueueDepth"); envVarMap.put("QPID_MAXIMUMMESSAGESIZE", "maximumMessageSize"); envVarMap.put("QPID_MINIMUMALERTREPEATGAP", "minimumAlertRepeatGap"); + envVarMap.put("QPID_QUEUECAPACITY", "capacity"); + envVarMap.put("QPID_FLOWRESUMECAPACITY", "flowResumeCapacity"); envVarMap.put("QPID_SOCKETRECEIVEBUFFER", "connector.socketReceiveBuffer"); envVarMap.put("QPID_SOCKETWRITEBUFFER", "connector.socketWriteBuffer"); envVarMap.put("QPID_TCPNODELAY", "connector.tcpNoDelay"); @@ -289,7 +291,6 @@ public class ServerConfiguration implements SignalHandler return conf; } - @Override public void handle(Signal arg0) { try @@ -507,6 +508,16 @@ public class ServerConfiguration implements SignalHandler return getConfig().getLong("minimumAlertRepeatGap", 0); } + public long getCapacity() + { + return getConfig().getLong("capacity", 0L); + } + + public long getFlowResumeCapacity() + { + return getConfig().getLong("flowResumeCapacity", getCapacity()); + } + public int getProcessors() { return getConfig().getInt("connector.processors", 4); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java index 0273a13262..6c72025ec2 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java @@ -166,4 +166,15 @@ public class VirtualHostConfiguration return _config.getLong("queues.minimumAlertRepeatGap", 0); } + + public long getCapacity() + { + return _config.getLong("queues.capacity", 0l); + } + + public long getFlowResumeCapacity() + { + return _config.getLong("queues.flowResumeCapacity", getCapacity()); + } + } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages_en_US.properties b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages_en_US.properties index 5ced7cc0b9..9169a1a651 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages_en_US.properties +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages_en_US.properties @@ -249,12 +249,16 @@ CHN-1003 = Close # 0 - bytes allowed in prefetch # 1 - number of messagse. CHN-1004 = Prefetch Size (bytes) {0,number} : Count {1,number} +CHN-1005 = Flow Control Enforced (Queue {0}) +CHN-1006 = Flow Control Removed #Queue # 0 - owner # 1 - priority QUE-1001 = Create :[ Owner: {0}][ AutoDelete][ Durable][ Transient][ Priority: {1,number,#}] QUE-1002 = Deleted +QUE-1003 = Overfull : Size : {0,number} bytes, Capacity : {1,number} +QUE-1004 = Underfull : Size : {0,number} bytes, Resume Capacity : {1,number} #Exchange # 0 - type @@ -269,4 +273,4 @@ BND-1002 = Deleted #Subscription SUB-1001 = Create[ : Durable][ : Arguments : {0}] SUB-1002 = Close -SUB-1003 = State : {0}
\ No newline at end of file +SUB-1003 = State : {0} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java index 2a692344d0..184504717e 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java @@ -160,6 +160,17 @@ public interface AMQQueue extends Managable, Comparable<AMQQueue> void setMinimumAlertRepeatGap(long value); + long getCapacity(); + + void setCapacity(long capacity); + + + long getFlowResumeCapacity(); + + void setFlowResumeCapacity(long flowResumeCapacity); + + + void deleteMessageFromTop(StoreContext storeContext) throws AMQException; long clearQueue(StoreContext storeContext) throws AMQException; @@ -180,6 +191,8 @@ public interface AMQQueue extends Managable, Comparable<AMQQueue> void stop(); + void checkCapacity(AMQChannel channel); + /** * ExistingExclusiveSubscription signals a failure to create a subscription, because an exclusive subscription * already exists. diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java index 7509350e65..267ccf43ea 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java @@ -26,11 +26,105 @@ import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.configuration.QueueConfiguration; import org.apache.qpid.server.virtualhost.VirtualHost; +import java.util.Map; +import java.util.HashMap; + public class AMQQueueFactory { public static final AMQShortString X_QPID_PRIORITIES = new AMQShortString("x-qpid-priorities"); + private abstract static class QueueProperty + { + + private final AMQShortString _argumentName; + + + public QueueProperty(String argumentName) + { + _argumentName = new AMQShortString(argumentName); + } + + public AMQShortString getArgumentName() + { + return _argumentName; + } + + + public abstract void setPropertyValue(AMQQueue queue, Object value); + + } + + private abstract static class QueueLongProperty extends QueueProperty + { + + public QueueLongProperty(String argumentName) + { + super(argumentName); + } + + public void setPropertyValue(AMQQueue queue, Object value) + { + if(value instanceof Number) + { + setPropertyValue(queue, ((Number)value).longValue()); + } + + } + + abstract void setPropertyValue(AMQQueue queue, long value); + + + } + + private static final QueueProperty[] DECLAREABLE_PROPERTIES = { + new QueueLongProperty("x-qpid-maximum-message-age") + { + public void setPropertyValue(AMQQueue queue, long value) + { + queue.setMaximumMessageAge(value); + } + }, + new QueueLongProperty("x-qpid-maximum-message-size") + { + public void setPropertyValue(AMQQueue queue, long value) + { + queue.setMaximumMessageSize(value); + } + }, + new QueueLongProperty("x-qpid-maximum-message-count") + { + public void setPropertyValue(AMQQueue queue, long value) + { + queue.setMaximumMessageCount(value); + } + }, + new QueueLongProperty("x-qpid-minimum-alert-repeat-gap") + { + public void setPropertyValue(AMQQueue queue, long value) + { + queue.setMinimumAlertRepeatGap(value); + } + }, + new QueueLongProperty("x-qpid-capacity") + { + public void setPropertyValue(AMQQueue queue, long value) + { + queue.setCapacity(value); + } + }, + new QueueLongProperty("x-qpid-flow-resume-capacity") + { + public void setPropertyValue(AMQQueue queue, long value) + { + queue.setFlowResumeCapacity(value); + } + } + + }; + + + public static AMQQueue createAMQQueueImpl(AMQShortString name, boolean durable, AMQShortString owner, @@ -53,6 +147,18 @@ public class AMQQueueFactory //Register the new queue virtualHost.getQueueRegistry().registerQueue(q); q.configure(virtualHost.getConfiguration().getQueueConfiguration(name.asString())); + + if(arguments != null) + { + for(QueueProperty p : DECLAREABLE_PROPERTIES) + { + if(arguments.containsKey(p.getArgumentName())) + { + p.setPropertyValue(q, arguments.get(p.getArgumentName())); + } + } + } + return q; } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java index 01b249b847..3b58f05f93 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java @@ -42,8 +42,7 @@ public class QueueEntryImpl implements QueueEntry private final SimpleQueueEntryList _queueEntryList; - private AMQMessage _message; - + private final AMQMessage _message; private Set<Subscription> _rejectedBy = null; @@ -191,7 +190,7 @@ public class QueueEntryImpl implements QueueEntry public boolean immediateAndNotDelivered() { - return _message.immediateAndNotDelivered(); + return getMessage().immediateAndNotDelivered(); } public void setRedelivered(boolean b) @@ -393,4 +392,5 @@ public class QueueEntryImpl implements QueueEntry { return _queueEntryList; } + } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java index 4b9fc9ca55..8b6c15c0c3 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java @@ -1,11 +1,10 @@ package org.apache.qpid.server.queue; -import java.util.ArrayList; -import java.util.EnumSet; -import java.util.List; -import java.util.Set; +import java.util.*; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.Executor; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -35,6 +34,7 @@ import org.apache.qpid.server.logging.subjects.QueueLogSubject; import org.apache.qpid.server.logging.LogSubject; import org.apache.qpid.server.logging.LogActor; import org.apache.qpid.server.logging.messages.QueueMessages; +import org.apache.qpid.server.AMQChannel; /* * @@ -96,6 +96,8 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener private final Executor _asyncDelivery; private final AtomicLong _totalMessagesReceived = new AtomicLong(); + private final ConcurrentMap<AMQChannel, Boolean> _blockedChannels = new ConcurrentHashMap<AMQChannel, Boolean>(); + /** max allowed size(KB) of a single message */ public long _maximumMessageSize = ApplicationRegistry.getInstance().getConfiguration().getMaximumMessageSize(); @@ -122,6 +124,10 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener private LogSubject _logSubject; private LogActor _logActor; + + private long _capacity = ApplicationRegistry.getInstance().getConfiguration().getCapacity(); + private long _flowResumeCapacity = ApplicationRegistry.getInstance().getConfiguration().getFlowResumeCapacity(); + protected SimpleAMQQueue(AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, VirtualHost virtualHost) throws AMQException { @@ -629,6 +635,8 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener throw new FailedDequeueException(_name.toString(), e); } + checkCapacity(); + } private void decrementQueueSize(final QueueEntry entry) @@ -1173,6 +1181,58 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener } } + public void checkCapacity(AMQChannel channel) + { + if(_capacity != 0l) + { + if(_atomicQueueSize.get() > _capacity) + { + //Overfull log message + _logActor.message(_logSubject, QueueMessages.QUE_1003(_atomicQueueSize.get(), _capacity)); + + if(_blockedChannels.putIfAbsent(channel, Boolean.TRUE)==null) + { + channel.block(this); + } + + if(_atomicQueueSize.get() <= _flowResumeCapacity) + { + + //Underfull log message + _logActor.message(_logSubject, QueueMessages.QUE_1004(_atomicQueueSize.get(), _flowResumeCapacity)); + + channel.unblock(this); + _blockedChannels.remove(channel); + + } + + } + + + + } + } + + private void checkCapacity() + { + if(_capacity != 0L) + { + if(_atomicQueueSize.get() <= _flowResumeCapacity) + { + //Underfull log message + _logActor.message(_logSubject, QueueMessages.QUE_1004(_atomicQueueSize.get(), _flowResumeCapacity)); + + + for(AMQChannel c : _blockedChannels.keySet()) + { + c.unblock(this); + _blockedChannels.remove(c); + } + } + } + } + + public void deliverAsync() { Runner runner = new Runner(_stateChangeCount.incrementAndGet()); @@ -1544,6 +1604,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener } } + public void checkMessageStatus() throws AMQException { @@ -1651,6 +1712,27 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener } } + public long getCapacity() + { + return _capacity; + } + + public void setCapacity(long capacity) + { + _capacity = capacity; + } + + public long getFlowResumeCapacity() + { + return _flowResumeCapacity; + } + + public void setFlowResumeCapacity(long flowResumeCapacity) + { + _flowResumeCapacity = flowResumeCapacity; + } + + public Set<NotificationCheck> getNotificationChecks() { return _notificationChecks; @@ -1720,6 +1802,8 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener setMaximumMessageSize(config.getMaximumMessageSize()); setMaximumMessageCount(config.getMaximumMessageCount()); setMinimumAlertRepeatGap(config.getMinimumAlertRepeatGap()); + _capacity = config.getCapacity(); + _flowResumeCapacity = config.getFlowResumeCapacity(); } } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java index 3c71282c57..450852cef7 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java @@ -97,6 +97,7 @@ public class LocalTransactionalContext implements TransactionalContext try { QueueEntry entry = _queue.enqueue(getStoreContext(),_message); + _queue.checkCapacity(_channel); if(entry.immediateAndNotDelivered()) { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java index 28af36e3db..10d6021d27 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java @@ -91,6 +91,8 @@ public class NonTransactionalContext implements TransactionalContext public void deliver(final AMQQueue queue, AMQMessage message) throws AMQException { QueueEntry entry = queue.enqueue(_storeContext, message); + queue.checkCapacity(_channel); + //following check implements the functionality //required by the 'immediate' flag: diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java index b16a289f0a..0a5274ab88 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java @@ -31,6 +31,7 @@ import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.management.ManagedObject; import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.server.AMQChannel; import org.apache.qpid.AMQException; import org.apache.commons.configuration.Configuration; @@ -271,6 +272,15 @@ public class MockAMQQueue implements AMQQueue //To change body of implemented methods use File | Settings | File Templates. } + public boolean getBlockOnQueueFull() + { + return false; + } + + public void setBlockOnQueueFull(boolean block) + { + } + public long getMinimumAlertRepeatGap() { return 0; //To change body of implemented methods use File | Settings | File Templates. @@ -285,8 +295,7 @@ public class MockAMQQueue implements AMQQueue { return 0; //To change body of implemented methods use File | Settings | File Templates. } - - @Override + public void checkMessageStatus() throws AMQException { //To change body of implemented methods use File | Settings | File Templates. @@ -317,6 +326,10 @@ public class MockAMQQueue implements AMQQueue //To change body of implemented methods use File | Settings | File Templates. } + public void checkCapacity(AMQChannel channel) + { + } + public ManagedObject getManagedObject() { return null; //To change body of implemented methods use File | Settings | File Templates. @@ -327,12 +340,31 @@ public class MockAMQQueue implements AMQQueue return 0; //To change body of implemented methods use File | Settings | File Templates. } - @Override public void setMinimumAlertRepeatGap(long value) { } + public long getCapacity() + { + return 0; //To change body of implemented methods use File | Settings | File Templates. + } + + public void setCapacity(long capacity) + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public long getFlowResumeCapacity() + { + return 0; //To change body of implemented methods use File | Settings | File Templates. + } + + public void setFlowResumeCapacity(long flowResumeCapacity) + { + //To change body of implemented methods use File | Settings | File Templates. + } + public void configure(QueueConfiguration config) { diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 948b21f8cf..fa15df34ec 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -115,6 +115,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic { + public static final class IdToConsumerMap<C extends BasicMessageConsumer> { private final BasicMessageConsumer[] _fastAccessConsumers = new BasicMessageConsumer[16]; @@ -199,16 +200,32 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic * The default value for immediate flag used by producers created by this session is false. That is, a consumer does * not need to be attached to a queue. */ - protected static final boolean DEFAULT_IMMEDIATE = Boolean.parseBoolean(System.getProperty("qpid.default_immediate", "false")); + protected final boolean DEFAULT_IMMEDIATE = Boolean.parseBoolean(System.getProperty("qpid.default_immediate", "false")); /** * The default value for mandatory flag used by producers created by this session is true. That is, server will not * silently drop messages where no queue is connected to the exchange for the message. */ - protected static final boolean DEFAULT_MANDATORY = Boolean.parseBoolean(System.getProperty("qpid.default_mandatory", "true")); + protected final boolean DEFAULT_MANDATORY = Boolean.parseBoolean(System.getProperty("qpid.default_mandatory", "true")); + + protected final boolean DEFAULT_WAIT_ON_SEND = Boolean.parseBoolean(System.getProperty("qpid.default_wait_on_send", "false")); + + /** + * The period to wait while flow controlled before sending a log message confirming that the session is still + * waiting on flow control being revoked + */ + protected final long FLOW_CONTROL_WAIT_PERIOD = Long.getLong("qpid.flow_control_wait_notify_period",5000L); + + /** + * The period to wait while flow controlled before declaring a failure + */ + public static final long DEFAULT_FLOW_CONTROL_WAIT_FAILURE = 120000L; + protected final long FLOW_CONTROL_WAIT_FAILURE = Long.getLong("qpid.flow_control_wait_failure", + DEFAULT_FLOW_CONTROL_WAIT_FAILURE); protected final boolean DECLARE_QUEUES = Boolean.parseBoolean(System.getProperty("qpid.declare_queues", "true")); + protected final boolean DECLARE_EXCHANGES = Boolean.parseBoolean(System.getProperty("qpid.declare_exchanges", "true")); @@ -2274,7 +2291,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic private P createProducerImpl(Destination destination, boolean mandatory, boolean immediate) throws JMSException { - return createProducerImpl(destination, mandatory, immediate, false); + return createProducerImpl(destination, mandatory, immediate, DEFAULT_WAIT_ON_SEND); } private P createProducerImpl(final Destination destination, final boolean mandatory, @@ -2745,15 +2762,26 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic public void setFlowControl(final boolean active) { _flowControl.setFlowControl(active); + _logger.warn("Broker enforced flow control " + (active ? "no longer in effect" : "has been enforced")); } - public void checkFlowControl() throws InterruptedException + public void checkFlowControl() throws InterruptedException, JMSException { + long expiryTime = 0L; synchronized (_flowControl) { - while (!_flowControl.getFlowControl()) + while (!_flowControl.getFlowControl() && + (expiryTime == 0L ? (expiryTime = System.currentTimeMillis() + FLOW_CONTROL_WAIT_FAILURE) + : expiryTime) >= System.currentTimeMillis() ) + { + + _flowControl.wait(FLOW_CONTROL_WAIT_PERIOD); + _logger.warn("Message send delayed by " + (System.currentTimeMillis() + FLOW_CONTROL_WAIT_FAILURE - expiryTime)/1000 + "s due to broker enforced flow control"); + } + if(!_flowControl.getFlowControl()) { - _flowControl.wait(); + _logger.error("Message send failed due to timeout waiting on broker enforced flow control"); + throw new JMSException("Unable to send message for " + FLOW_CONTROL_WAIT_FAILURE/1000 + " seconds due to broker enforced flow control"); } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java index 9c791730ca..0e3333940a 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java @@ -39,6 +39,7 @@ public class ClientMethodDispatcherImpl implements MethodDispatcher private static final BasicReturnMethodHandler _basicReturnMethodHandler = BasicReturnMethodHandler.getInstance(); private static final ChannelCloseMethodHandler _channelCloseMethodHandler = ChannelCloseMethodHandler.getInstance(); private static final ChannelFlowOkMethodHandler _channelFlowOkMethodHandler = ChannelFlowOkMethodHandler.getInstance(); + private static final ChannelFlowMethodHandler _channelFlowMethodHandler = ChannelFlowMethodHandler.getInstance(); private static final ConnectionCloseMethodHandler _connectionCloseMethodHandler = ConnectionCloseMethodHandler.getInstance(); private static final ConnectionOpenOkMethodHandler _connectionOpenOkMethodHandler = ConnectionOpenOkMethodHandler.getInstance(); private static final ConnectionRedirectMethodHandler _connectionRedirectMethodHandler = ConnectionRedirectMethodHandler.getInstance(); @@ -159,7 +160,8 @@ public class ClientMethodDispatcherImpl implements MethodDispatcher public boolean dispatchChannelFlow(ChannelFlowBody body, int channelId) throws AMQException { - return false; + _channelFlowMethodHandler.methodReceived(_session, body, channelId); + return true; } public boolean dispatchChannelFlowOk(ChannelFlowOkBody body, int channelId) throws AMQException diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java new file mode 100644 index 0000000000..97147904e1 --- /dev/null +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java @@ -0,0 +1,324 @@ +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ +package org.apache.qpid.server.queue; + +import org.apache.log4j.Logger; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.client.AMQQueue; +import org.apache.qpid.client.AMQDestination; +import org.apache.qpid.test.utils.QpidTestCase; +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQShortString; + +import javax.jms.*; +import javax.naming.NamingException; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; + +public class ProducerFlowControlTest extends QpidTestCase +{ + private static final int TIMEOUT = 1500; + + + private static final Logger _logger = Logger.getLogger(ProducerFlowControlTest.class); + + protected final String QUEUE = "ProducerFlowControl"; + + private static final int MSG_COUNT = 50; + + private Connection producerConnection; + private MessageProducer producer; + private Session producerSession; + private Queue queue; + private Connection consumerConnection; + private Session consumerSession; + + + private MessageConsumer consumer; + private final AtomicInteger _sentMessages = new AtomicInteger(); + + protected void setUp() throws Exception + { + super.setUp(); + + producerConnection = getConnection(); + producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + producerConnection.start(); + + consumerConnection = getConnection(); + consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + } + + protected void tearDown() throws Exception + { + producerConnection.close(); + consumerConnection.close(); + super.tearDown(); + } + + public void testCapacityExceededCausesBlock() + throws JMSException, NamingException, AMQException, InterruptedException + { + final Map<String,Object> arguments = new HashMap<String, Object>(); + arguments.put("x-qpid-capacity",1000); + arguments.put("x-qpid-flow-resume-capacity",800); + ((AMQSession) producerSession).createQueue(new AMQShortString(QUEUE), true, false, false, arguments); + queue = new AMQQueue("amq.direct",QUEUE); + ((AMQSession) producerSession).declareAndBind((AMQDestination)queue); + producer = producerSession.createProducer(queue); + + _sentMessages.set(0); + + + // try to send 5 messages (should block after 4) + sendMessagesAsync(producer, producerSession, 5, 50L); + + Thread.sleep(5000); + + assertEquals("Incorrect number of message sent before blocking", 4, _sentMessages.get()); + + consumer = consumerSession.createConsumer(queue); + consumerConnection.start(); + + + consumer.receive(); + + Thread.sleep(1000); + + assertEquals("Message incorrectly sent after one message received", 4, _sentMessages.get()); + + + consumer.receive(); + + Thread.sleep(1000); + + assertEquals("Message not sent after two messages received", 5, _sentMessages.get()); + + } + + + public void testFlowControlOnCapacityResumeEqual() + throws JMSException, NamingException, AMQException, InterruptedException + { + final Map<String,Object> arguments = new HashMap<String, Object>(); + arguments.put("x-qpid-capacity",1000); + arguments.put("x-qpid-flow-resume-capacity",1000); + ((AMQSession) producerSession).createQueue(new AMQShortString(QUEUE), true, false, false, arguments); + queue = new AMQQueue("amq.direct",QUEUE); + ((AMQSession) producerSession).declareAndBind((AMQDestination)queue); + producer = producerSession.createProducer(queue); + + _sentMessages.set(0); + + + // try to send 5 messages (should block after 4) + sendMessagesAsync(producer, producerSession, 5, 50L); + + Thread.sleep(5000); + + assertEquals("Incorrect number of message sent before blocking", 4, _sentMessages.get()); + + consumer = consumerSession.createConsumer(queue); + consumerConnection.start(); + + + consumer.receive(); + + Thread.sleep(1000); + + assertEquals("Message incorrectly sent after one message received", 5, _sentMessages.get()); + + + } + + + public void testFlowControlSoak() + throws Exception, NamingException, AMQException, InterruptedException + { + _sentMessages.set(0); + final int numProducers = 10; + final int numMessages = 100; + + final Map<String,Object> arguments = new HashMap<String, Object>(); + arguments.put("x-qpid-capacity",6000); + arguments.put("x-qpid-flow-resume-capacity",3000); + + ((AMQSession) consumerSession).createQueue(new AMQShortString(QUEUE), false, false, false, arguments); + + queue = new AMQQueue("amq.direct",QUEUE); + ((AMQSession) consumerSession).declareAndBind((AMQDestination)queue); + consumerConnection.start(); + + Connection[] producers = new Connection[numProducers]; + for(int i = 0 ; i < numProducers; i ++) + { + + producers[i] = getConnection(); + producers[i].start(); + Session session = producers[i].createSession(false, Session.AUTO_ACKNOWLEDGE); + + MessageProducer myproducer = session.createProducer(queue); + MessageSender sender = sendMessagesAsync(myproducer, session, numMessages, 50L); + } + + consumer = consumerSession.createConsumer(queue); + consumerConnection.start(); + + for(int j = 0; j < numProducers * numMessages; j++) + { + + Message msg = consumer.receive(5000); + Thread.sleep(50L); + assertNotNull("Message not received("+j+"), sent: "+_sentMessages.get(), msg); + + } + + + + Message msg = consumer.receive(500); + assertNull("extra message received", msg); + + + for(int i = 0; i < numProducers; i++) + { + producers[i].close(); + } + + } + + + + public void testSendTimeout() + throws JMSException, NamingException, AMQException, InterruptedException + { + long origTimeoutValue = Long.getLong("qpid.flow_control_wait_failure",AMQSession.DEFAULT_FLOW_CONTROL_WAIT_FAILURE); + System.setProperty("qpid.flow_control_wait_failure","3000"); + Session session = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + + final Map<String,Object> arguments = new HashMap<String, Object>(); + arguments.put("x-qpid-capacity",1000); + arguments.put("x-qpid-flow-resume-capacity",800); + ((AMQSession) session).createQueue(new AMQShortString(QUEUE), true, false, false, arguments); + queue = new AMQQueue("amq.direct",QUEUE); + ((AMQSession) session).declareAndBind((AMQDestination)queue); + producer = session.createProducer(queue); + + _sentMessages.set(0); + + + // try to send 5 messages (should block after 4) + MessageSender sender = sendMessagesAsync(producer, producerSession, 5, 50L); + + Thread.sleep(10000); + + Exception e = sender.getException(); + + assertNotNull("No timeout exception on sending", e); + + System.setProperty("qpid.flow_control_wait_failure",String.valueOf(origTimeoutValue)); + + + + } + + private MessageSender sendMessagesAsync(final MessageProducer producer, + final Session producerSession, + final int numMessages, + long sleepPeriod) + { + MessageSender sender = new MessageSender(producer, producerSession, numMessages,sleepPeriod); + new Thread(sender).start(); + return sender; + } + + private void sendMessages(MessageProducer producer, Session producerSession, int numMessages, long sleepPeriod) + throws JMSException + { + + for (int msg = 0; msg < numMessages; msg++) + { + producer.send(nextMessage(msg, producerSession)); + _sentMessages.incrementAndGet(); + + try + { + Thread.sleep(sleepPeriod); + } + catch (InterruptedException e) + { + } + } + } + + private static final byte[] BYTE_300 = new byte[300]; + + + private Message nextMessage(int msg, Session producerSession) throws JMSException + { + BytesMessage send = producerSession.createBytesMessage(); + send.writeBytes(BYTE_300); + send.setIntProperty("msg", msg); + + return send; + } + + + private class MessageSender implements Runnable + { + private final MessageProducer _producer; + private final Session _producerSession; + private final int _numMessages; + + + + private JMSException _exception; + private long _sleepPeriod; + + public MessageSender(MessageProducer producer, Session producerSession, int numMessages, long sleepPeriod) + { + _producer = producer; + _producerSession = producerSession; + _numMessages = numMessages; + _sleepPeriod = sleepPeriod; + } + + public void run() + { + try + { + sendMessages(_producer, _producerSession, _numMessages, _sleepPeriod); + } + catch (JMSException e) + { + _exception = e; + } + } + + public JMSException getException() + { + return _exception; + } + } +}
\ No newline at end of file diff --git a/qpid/java/test-profiles/010Excludes b/qpid/java/test-profiles/010Excludes index d27a3bca95..ac6ec4752a 100755 --- a/qpid/java/test-profiles/010Excludes +++ b/qpid/java/test-profiles/010Excludes @@ -96,3 +96,6 @@ org.apache.qpid.test.unit.client.DynamicQueueExchangeCreateTest#* // QPID-2118 : 0-10 Java client has differrent error handling to 0-8 code path org.apache.qpid.test.client.message.SelectorTest#testRuntimeSelectorError +//QPID-942 : Implemented Channel.Flow based Producer Side flow control to the Java Broker (not in CPP Broker) +org.apache.qpid.server.queue.ProducerFlowControlTest + |
