summaryrefslogtreecommitdiff
path: root/qpid/java
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2012-01-11 09:52:16 +0000
committerRobert Godfrey <rgodfrey@apache.org>2012-01-11 09:52:16 +0000
commit0f81a00b5cb061780b1aac0d2ff216e2c16de0ec (patch)
tree78d645f9afccb1dd48047512c40b8996b407cb13 /qpid/java
parentb7fcbdb8db0f9d210c107db77cf5d16af366b3fd (diff)
downloadqpid-python-0f81a00b5cb061780b1aac0d2ff216e2c16de0ec.tar.gz
QPID-3717 - Fixes based on review by Robbie Gemmell
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1229943 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java16
1 files changed, 7 insertions, 9 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
index 7a2c07b9c8..7d06dd2c22 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
@@ -47,7 +47,6 @@ import org.apache.qpid.server.configuration.ConfiguredObject;
import org.apache.qpid.server.configuration.ConnectionConfig;
import org.apache.qpid.server.configuration.SessionConfig;
import org.apache.qpid.server.configuration.SessionConfigType;
-import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.logging.LogActor;
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.logging.actors.CurrentActor;
@@ -90,7 +89,7 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi
private static final Logger _logger = LoggerFactory.getLogger(ServerSession.class);
private static final String NULL_DESTINTATION = UUID.randomUUID().toString();
- private static final int HALF_INCOMING_CREDIT_THRESHOLD = 1 << 30;
+ private static final int PRODUCER_CREDIT_TOPUP_THRESHOLD = 1 << 30;
private final UUID _id;
private ConnectionConfig _connectionConfig;
@@ -100,12 +99,9 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi
private final ConcurrentMap<AMQQueue, Boolean> _blockingQueues = new ConcurrentHashMap<AMQQueue, Boolean>();
- private final ConcurrentMap<Exchange, Boolean> _blockingExchanges = new ConcurrentHashMap<Exchange, Boolean>();
-
-
private final AtomicBoolean _blocking = new AtomicBoolean(false);
private ChannelLogSubject _logSubject;
- private final AtomicInteger _oustandingCredit = new AtomicInteger(Integer.MAX_VALUE);
+ private final AtomicInteger _outstandingCredit = new AtomicInteger(UNLIMITED_CREDIT);
public static interface MessageDispositionChangeListener
@@ -181,9 +177,11 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi
public void enqueue(final ServerMessage message, final List<? extends BaseQueue> queues)
{
- if(_oustandingCredit.decrementAndGet() < HALF_INCOMING_CREDIT_THRESHOLD)
+ if(_outstandingCredit.get() != UNLIMITED_CREDIT
+ && _outstandingCredit.decrementAndGet() == (Integer.MAX_VALUE - PRODUCER_CREDIT_TOPUP_THRESHOLD))
{
- invoke(new MessageFlow("",MessageCreditUnit.MESSAGE,HALF_INCOMING_CREDIT_THRESHOLD));
+ _outstandingCredit.addAndGet(PRODUCER_CREDIT_TOPUP_THRESHOLD);
+ invoke(new MessageFlow("",MessageCreditUnit.MESSAGE, PRODUCER_CREDIT_TOPUP_THRESHOLD));
}
getConnectionModel().registerMessageReceived(message.getSize(), message.getArrivalTime());
PostEnqueueAction postTransactionAction;
@@ -712,7 +710,7 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi
MessageFlow mf = new MessageFlow();
mf.setUnit(MessageCreditUnit.MESSAGE);
mf.setDestination("");
- _oustandingCredit.set(Integer.MAX_VALUE);
+ _outstandingCredit.set(Integer.MAX_VALUE);
mf.setValue(Integer.MAX_VALUE);
invoke(mf);