diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2012-05-07 22:40:52 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2012-05-07 22:40:52 +0000 |
| commit | 9eab96a9a3569486f6351c94abf4f95ed515e9b1 (patch) | |
| tree | ae86cedd9fdcea4f49993e5a82954ccda53a1ed3 /qpid/java/common | |
| parent | 1427de0275b5db2c8619db9211435897123259d8 (diff) | |
| download | qpid-python-9eab96a9a3569486f6351c94abf4f95ed515e9b1.tar.gz | |
QPID-3986 : [Java Broker] Add producer flow control based on total disk usage
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1335290 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/common')
| -rw-r--r-- | qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java | 27 |
1 files changed, 19 insertions, 8 deletions
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java index 110c73f718..06b606f2d3 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java @@ -210,6 +210,17 @@ public class Session extends SessionInvoker } } + protected State getState() + { + return this.state; + } + + public boolean isFlowControlled() + { + return flowControl; + } + + void setFlowControl(boolean value) { flowControl = value; @@ -307,7 +318,7 @@ public class Session extends SessionInvoker xfr.setHeader(new Header(deliveryProps, header.getMessageProperties(), header.getNonStandardProperties())); } - + } else { @@ -616,7 +627,7 @@ public class Session extends SessionInvoker { acquireCredit(); } - + synchronized (commandsLock) { if (state == DETACHED && m.isUnreliable()) @@ -732,11 +743,11 @@ public class Session extends SessionInvoker { sessionCommandPoint(0, 0); } - + boolean replayTransfer = !closing && !transacted && m instanceof MessageTransfer && ! m.isUnreliable(); - + if ((replayTransfer) || m.hasCompletionListener()) { setCommand(next, m); @@ -833,7 +844,7 @@ public class Session extends SessionInvoker Waiter w = new Waiter(commandsLock, timeout); while (w.hasTime() && state != CLOSED && lt(maxComplete, point)) { - checkFailoverRequired("Session sync was interrupted by failover."); + checkFailoverRequired("Session sync was interrupted by failover."); if(log.isDebugEnabled()) { log.debug("%s waiting for[%d]: %d, %s", this, point, maxComplete, commands); @@ -871,7 +882,7 @@ public class Session extends SessionInvoker { future = results.remove(command); } - + if (future != null) { future.set(result); @@ -1039,7 +1050,7 @@ public class Session extends SessionInvoker } } - protected void awaitClose() + protected void awaitClose() { Waiter w = new Waiter(commandsLock, timeout); while (w.hasTime() && state != CLOSED) @@ -1096,7 +1107,7 @@ public class Session extends SessionInvoker if(state == CLOSED) { - connection.removeSession(this); + connection.removeSession(this); listener.closed(this); } } |
