diff options
Diffstat (limited to 'qpid/java')
| -rw-r--r-- | qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java | 16 |
1 files changed, 7 insertions, 9 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java index 7a2c07b9c8..7d06dd2c22 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java @@ -47,7 +47,6 @@ import org.apache.qpid.server.configuration.ConfiguredObject; import org.apache.qpid.server.configuration.ConnectionConfig; import org.apache.qpid.server.configuration.SessionConfig; import org.apache.qpid.server.configuration.SessionConfigType; -import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.logging.LogActor; import org.apache.qpid.server.logging.LogSubject; import org.apache.qpid.server.logging.actors.CurrentActor; @@ -90,7 +89,7 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi private static final Logger _logger = LoggerFactory.getLogger(ServerSession.class); private static final String NULL_DESTINTATION = UUID.randomUUID().toString(); - private static final int HALF_INCOMING_CREDIT_THRESHOLD = 1 << 30; + private static final int PRODUCER_CREDIT_TOPUP_THRESHOLD = 1 << 30; private final UUID _id; private ConnectionConfig _connectionConfig; @@ -100,12 +99,9 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi private final ConcurrentMap<AMQQueue, Boolean> _blockingQueues = new ConcurrentHashMap<AMQQueue, Boolean>(); - private final ConcurrentMap<Exchange, Boolean> _blockingExchanges = new ConcurrentHashMap<Exchange, Boolean>(); - - private final AtomicBoolean _blocking = new AtomicBoolean(false); private ChannelLogSubject _logSubject; - private final AtomicInteger _oustandingCredit = new AtomicInteger(Integer.MAX_VALUE); + private final AtomicInteger _outstandingCredit = new AtomicInteger(UNLIMITED_CREDIT); public static interface MessageDispositionChangeListener @@ -181,9 +177,11 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi public void enqueue(final ServerMessage message, final List<? extends BaseQueue> queues) { - if(_oustandingCredit.decrementAndGet() < HALF_INCOMING_CREDIT_THRESHOLD) + if(_outstandingCredit.get() != UNLIMITED_CREDIT + && _outstandingCredit.decrementAndGet() == (Integer.MAX_VALUE - PRODUCER_CREDIT_TOPUP_THRESHOLD)) { - invoke(new MessageFlow("",MessageCreditUnit.MESSAGE,HALF_INCOMING_CREDIT_THRESHOLD)); + _outstandingCredit.addAndGet(PRODUCER_CREDIT_TOPUP_THRESHOLD); + invoke(new MessageFlow("",MessageCreditUnit.MESSAGE, PRODUCER_CREDIT_TOPUP_THRESHOLD)); } getConnectionModel().registerMessageReceived(message.getSize(), message.getArrivalTime()); PostEnqueueAction postTransactionAction; @@ -712,7 +710,7 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi MessageFlow mf = new MessageFlow(); mf.setUnit(MessageCreditUnit.MESSAGE); mf.setDestination(""); - _oustandingCredit.set(Integer.MAX_VALUE); + _outstandingCredit.set(Integer.MAX_VALUE); mf.setValue(Integer.MAX_VALUE); invoke(mf); |
