summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ChannelLogSubject.java30
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java6
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java13
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java77
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java3
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/Range.java8
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/Session.java22
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java23
-rwxr-xr-xjava/test-profiles/Java010Excludes3
10 files changed, 157 insertions, 30 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ChannelLogSubject.java b/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ChannelLogSubject.java
index 9b357403a8..8266c1e79f 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ChannelLogSubject.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ChannelLogSubject.java
@@ -22,6 +22,9 @@ package org.apache.qpid.server.logging.subjects;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.transport.ServerConnection;
+import org.apache.qpid.server.transport.ServerSession;
+
import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CHANNEL_FORMAT;
public class ChannelLogSubject extends AbstractLogSubject
@@ -52,5 +55,30 @@ public class ChannelLogSubject extends AbstractLogSubject
session.getVirtualHost().getName(),
channel.getChannelId());
}
-
+
+ public ChannelLogSubject(ServerSession session)
+ {
+ /**
+ * LOG FORMAT used by the AMQPConnectorActor follows
+ * ChannelLogSubject.CHANNEL_FORMAT :
+ * con:{0}({1}@{2}/{3})/ch:{4}
+ *
+ * Uses a MessageFormat call to insert the required values according to
+ * these indices:
+ *
+ * 0 - Connection ID
+ * 1 - User ID
+ * 2 - IP
+ * 3 - Virtualhost
+ * 4 - Channel ID
+ */
+ ServerConnection connection = (ServerConnection) session.getConnection();
+ setLogStringWithFormat(CHANNEL_FORMAT,
+ connection == null ? -1L : connection.getConnectionId(),
+ session.getAuthorizedPrincipal() == null ? "?" : session.getAuthorizedPrincipal().getName(),
+ (connection == null || connection.getConfig() == null) ? "?" : connection.getConfig().getAddress(),
+ session.getVirtualHost().getName(),
+ session.getChannel());
+ }
+
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java
index bc63403a86..c55fe321fc 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java
@@ -22,6 +22,8 @@ package org.apache.qpid.server.protocol;
import org.apache.qpid.AMQException;
import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.SimpleAMQQueue;
public interface AMQSessionModel
{
@@ -51,4 +53,8 @@ public interface AMQSessionModel
* @param idleClose time in milliseconds before closing connection with idle transaction
*/
public void checkTransactionStatus(long openWarn, long openClose, long idleWarn, long idleClose) throws AMQException;
+
+ void block(AMQQueue queue);
+
+ void unblock(AMQQueue queue);
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
index 6dfdc5e8b4..a1f1c037ec 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
@@ -217,7 +217,7 @@ public interface AMQQueue extends Managable, Comparable<AMQQueue>, ExchangeRefer
Map<String, Object> getArguments();
- void checkCapacity(AMQChannel channel);
+ void checkCapacity(AMQSessionModel channel);
/**
* ExistingExclusiveSubscription signals a failure to create a subscription, because an exclusive subscription
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
index 25fc91b998..ebed781a1a 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
@@ -164,7 +164,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
private AtomicInteger _deliveredMessages = new AtomicInteger();
private AtomicBoolean _stopped = new AtomicBoolean(false);
- private final ConcurrentMap<AMQChannel, Boolean> _blockedChannels = new ConcurrentHashMap<AMQChannel, Boolean>();
+ private final ConcurrentMap<AMQSessionModel, Boolean> _blockedChannels = new ConcurrentHashMap<AMQSessionModel, Boolean>();
private final AtomicBoolean _deleted = new AtomicBoolean(false);
private final List<Task> _deleteTaskList = new CopyOnWriteArrayList<Task>();
@@ -1528,7 +1528,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
}
}
- public void checkCapacity(AMQChannel channel)
+ public void checkCapacity(AMQSessionModel channel)
{
if(_capacity != 0l)
{
@@ -1538,10 +1538,9 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
//Overfull log message
_logActor.message(_logSubject, QueueMessages.OVERFULL(_atomicQueueSize.get(), _capacity));
- if(_blockedChannels.putIfAbsent(channel, Boolean.TRUE)==null)
- {
- channel.block(this);
- }
+ _blockedChannels.putIfAbsent(channel, Boolean.TRUE);
+
+ channel.block(this);
if(_atomicQueueSize.get() <= _flowResumeCapacity)
{
@@ -1573,7 +1572,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
}
- for(AMQChannel c : _blockedChannels.keySet())
+ for(AMQSessionModel c : _blockedChannels.keySet())
{
c.unblock(this);
_blockedChannels.remove(c);
diff --git a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
index 8e6d33d3bc..7526b19058 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
@@ -25,7 +25,6 @@ import static org.apache.qpid.util.Serial.gt;
import java.security.Principal;
import java.text.MessageFormat;
-import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
@@ -33,8 +32,11 @@ import java.util.Map;
import java.util.SortedMap;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import javax.security.auth.Subject;
import org.apache.qpid.AMQException;
@@ -45,11 +47,13 @@ import org.apache.qpid.server.configuration.ConfiguredObject;
import org.apache.qpid.server.configuration.ConnectionConfig;
import org.apache.qpid.server.configuration.SessionConfig;
import org.apache.qpid.server.configuration.SessionConfigType;
+import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.logging.LogActor;
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.actors.GenericActor;
import org.apache.qpid.server.logging.messages.ChannelMessages;
+import org.apache.qpid.server.logging.subjects.ChannelLogSubject;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.protocol.AMQConnectionModel;
@@ -66,6 +70,11 @@ import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.transport.Binary;
import org.apache.qpid.transport.Connection;
+import org.apache.qpid.transport.MessageCreditUnit;
+import org.apache.qpid.transport.MessageFlow;
+import org.apache.qpid.transport.MessageFlowMode;
+import org.apache.qpid.transport.MessageSetFlowMode;
+import org.apache.qpid.transport.MessageStop;
import org.apache.qpid.transport.MessageTransfer;
import org.apache.qpid.transport.Method;
import org.apache.qpid.transport.Range;
@@ -81,6 +90,7 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi
private static final Logger _logger = LoggerFactory.getLogger(ServerSession.class);
private static final String NULL_DESTINTATION = UUID.randomUUID().toString();
+ private static final int HALF_INCOMING_CREDIT_THRESHOLD = 1 << 30;
private final UUID _id;
private ConnectionConfig _connectionConfig;
@@ -88,6 +98,16 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi
private LogActor _actor = GenericActor.getInstance(this);
private PostEnqueueAction _postEnqueueAction = new PostEnqueueAction();
+ private final ConcurrentMap<AMQQueue, Boolean> _blockingQueues = new ConcurrentHashMap<AMQQueue, Boolean>();
+
+ private final ConcurrentMap<Exchange, Boolean> _blockingExchanges = new ConcurrentHashMap<Exchange, Boolean>();
+
+
+ private final AtomicBoolean _blocking = new AtomicBoolean(false);
+ private ChannelLogSubject _logSubject;
+ private final AtomicInteger _oustandingCredit = new AtomicInteger(Integer.MAX_VALUE);
+
+
public static interface MessageDispositionChangeListener
{
public void onAccept();
@@ -132,7 +152,7 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi
super(connection, delegate, name, expiry);
_connectionConfig = connConfig;
_transaction = new AutoCommitTransaction(this.getMessageStore());
-
+ _logSubject = new ChannelLogSubject(this);
_id = getConfigStore().createId();
getConfigStore().addConfiguredObject(this);
}
@@ -161,6 +181,10 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi
public void enqueue(final ServerMessage message, final List<? extends BaseQueue> queues)
{
+ if(_oustandingCredit.decrementAndGet() < HALF_INCOMING_CREDIT_THRESHOLD)
+ {
+ invoke(new MessageFlow("",MessageCreditUnit.MESSAGE,HALF_INCOMING_CREDIT_THRESHOLD));
+ }
getConnectionModel().registerMessageReceived(message.getSize(), message.getArrivalTime());
PostEnqueueAction postTransactionAction;
if(isTransactional())
@@ -661,6 +685,43 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi
}
}
+ public void block(AMQQueue queue)
+ {
+ if(_blockingQueues.putIfAbsent(queue, Boolean.TRUE) == null)
+ {
+
+ if(_blocking.compareAndSet(false,true))
+ {
+ invoke(new MessageSetFlowMode("", MessageFlowMode.CREDIT));
+ invoke(new MessageStop(""));
+ _actor.message(_logSubject, ChannelMessages.FLOW_ENFORCED(queue.getNameShortString().toString()));
+ }
+
+
+ }
+ }
+
+ public void unblock(AMQQueue queue)
+ {
+ if(_blockingQueues.remove(queue) && _blockingQueues.isEmpty())
+ {
+ if(_blocking.compareAndSet(true,false))
+ {
+
+ _actor.message(_logSubject, ChannelMessages.FLOW_REMOVED());
+ MessageFlow mf = new MessageFlow();
+ mf.setUnit(MessageCreditUnit.MESSAGE);
+ mf.setDestination("");
+ _oustandingCredit.set(Integer.MAX_VALUE);
+ mf.setValue(Integer.MAX_VALUE);
+ invoke(mf);
+
+
+ }
+ }
+ }
+
+
public String toLogString()
{
return "[" +
@@ -701,7 +762,7 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi
}
}
- private static class PostEnqueueAction implements ServerTransaction.Action
+ private class PostEnqueueAction implements ServerTransaction.Action
{
private List<? extends BaseQueue> _queues;
@@ -732,7 +793,13 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi
{
try
{
- _queues.get(i).enqueue(_message, _transactional, null);
+ BaseQueue queue = _queues.get(i);
+ queue.enqueue(_message, _transactional, null);
+ if(queue instanceof AMQQueue)
+ {
+ ((AMQQueue)queue).checkCapacity(ServerSession.this);
+ }
+
}
catch (AMQException e)
{
@@ -756,6 +823,6 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi
public boolean getBlocking()
{
- return false; //TODO: Blocking not implemented on 0-10 yet.
+ return _blocking.get();
}
}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java b/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
index f43af447ff..8b029f9a51 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
@@ -30,7 +30,6 @@ import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.management.ManagedObject;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.security.AuthorizationHolder;
-import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.binding.Binding;
import org.apache.qpid.server.txn.ServerTransaction;
@@ -477,7 +476,7 @@ public class MockAMQQueue implements AMQQueue
return null;
}
- public void checkCapacity(AMQChannel channel)
+ public void checkCapacity(AMQSessionModel channel)
{
}
diff --git a/java/common/src/main/java/org/apache/qpid/transport/Range.java b/java/common/src/main/java/org/apache/qpid/transport/Range.java
index f976337788..c47171dc4b 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/Range.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/Range.java
@@ -185,6 +185,12 @@ public abstract class Range implements RangeSet
}
}
+ public String toString()
+ {
+ return "[" + point + ", " + point + "]";
+ }
+
+
}
private static class RangeImpl extends Range
@@ -283,7 +289,7 @@ public abstract class Range implements RangeSet
return range;
}
- @Override
+
public void remove()
{
throw new UnsupportedOperationException();
diff --git a/java/common/src/main/java/org/apache/qpid/transport/Session.java b/java/common/src/main/java/org/apache/qpid/transport/Session.java
index 3e823ba6fe..d391181217 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/Session.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/Session.java
@@ -61,7 +61,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
public class Session extends SessionInvoker
{
private static final Logger log = Logger.get(Session.class);
-
+
public enum State { NEW, DETACHED, RESUMING, OPEN, CLOSING, CLOSED }
static class DefaultSessionListener implements SessionListener
@@ -96,6 +96,9 @@ public class Session extends SessionInvoker
private final long timeout = Long.getLong(ClientProperties.QPID_SYNC_OP_TIMEOUT,
Long.getLong(ClientProperties.AMQJ_DEFAULT_SYNCWRITE_TIMEOUT,
ClientProperties.DEFAULT_SYNC_OPERATION_TIMEOUT));
+ private final long blockedSendTimeout = Long.getLong("qpid.flow_control_wait_failure", timeout);
+ private long blockedSendReportingPeriod = Long.getLong("qpid.flow_control_wait_notify_period",5000L);
+
private boolean autoSync = false;
private boolean incomingInit;
@@ -228,10 +231,21 @@ public class Session extends SessionInvoker
{
try
{
- if (!credit.tryAcquire(timeout, TimeUnit.MILLISECONDS))
+ long wait = blockedSendTimeout > blockedSendReportingPeriod ? blockedSendReportingPeriod :
+ blockedSendTimeout;
+ long totalWait = 1L;
+ while(totalWait <= blockedSendTimeout && !credit.tryAcquire(wait, TimeUnit.MILLISECONDS))
+ {
+ totalWait+=wait;
+ log.warn("Message send delayed by " + (totalWait)/1000 + "s due to broker enforced flow control");
+
+
+ }
+ if(totalWait > blockedSendTimeout)
{
+ log.error("Message send failed due to timeout waiting on broker enforced flow control");
throw new SessionException
- ("timed out waiting for message credit");
+ ("timed out waiting for message credit");
}
}
catch (InterruptedException e)
@@ -815,7 +829,7 @@ public class Session extends SessionInvoker
while (w.hasTime() && state != CLOSED && lt(maxComplete, point))
{
checkFailoverRequired("Session sync was interrupted by failover.");
- log.debug("%s waiting for[%d]: %d, %s", this, point, maxComplete, commands);
+ log.debug("%s waiting for[%d]: %d, %s", this, point, maxComplete, Arrays.asList(commands));
w.await();
}
diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java b/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java
index 775d2c3eb0..47f334adf6 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java
@@ -154,8 +154,7 @@ public class ProducerFlowControlTest extends AbstractTestLogging
// try to send 5 messages (should block after 4)
sendMessagesAsync(producer, producerSession, 5, 50L);
- Thread.sleep(5000);
- List<String> results = waitAndFindMatches("QUE-1003");
+ List<String> results = waitAndFindMatches("QUE-1003", 7000);
assertEquals("Did not find correct number of QUE-1003 queue overfull messages", 1, results.size());
@@ -199,11 +198,13 @@ public class ProducerFlowControlTest extends AbstractTestLogging
// try to send 5 messages (should block after 4)
MessageSender sender = sendMessagesAsync(producer, producerSession, 5, 50L);
- Thread.sleep(TIMEOUT);
List<String> results = waitAndFindMatches("Message send delayed by", TIMEOUT);
assertTrue("No delay messages logged by client",results.size()!=0);
- results = findMatches("Message send failed due to timeout waiting on broker enforced flow control");
- assertEquals("Incorrect number of send failure messages logged by client",1,results.size());
+
+ List<String> failedMessages = waitAndFindMatches("Message send failed due to timeout waiting on broker enforced"
+ + " flow control", TIMEOUT);
+ assertEquals("Incorrect number of send failure messages logged by client (got " + results.size() + " delay "
+ + "messages)",1,failedMessages.size());
@@ -325,8 +326,9 @@ public class ProducerFlowControlTest extends AbstractTestLogging
// try to send 5 messages (should block after 4)
- MessageSender sender = sendMessagesAsync(producer, producerSession, 5, 50L);
+ MessageSender sender = sendMessagesAsync(producer, producerSession, 5, 100L);
+
Thread.sleep(10000);
Exception e = sender.getException();
@@ -440,6 +442,15 @@ public class ProducerFlowControlTest extends AbstractTestLogging
e.printStackTrace();
throw new RuntimeException(e);
}
+
+ try
+ {
+ Thread.sleep(sleepPeriod);
+ }
+ catch (InterruptedException e)
+ {
+ throw new RuntimeException(e);
+ }
}
}
diff --git a/java/test-profiles/Java010Excludes b/java/test-profiles/Java010Excludes
index 59cb5066f1..0de4c6bae5 100755
--- a/java/test-profiles/Java010Excludes
+++ b/java/test-profiles/Java010Excludes
@@ -45,9 +45,6 @@ org.apache.qpid.server.logging.SubscriptionLoggingTest#testSubscriptionSuspend
// 0-10 is not supported by the MethodRegistry
org.apache.qpid.test.unit.close.JavaServerCloseRaceConditionTest#*
-//QPID-942 : Implemented Channel.Flow based Producer Side flow control to the Java Broker (not in CPP Broker)
-org.apache.qpid.server.queue.ProducerFlowControlTest#*
-
//QPID-1864: rollback with subscriptions does not work in 0-10 yet
org.apache.qpid.test.client.RollbackOrderTest#testOrderingAfterRollbackOnMessage