summaryrefslogtreecommitdiff
path: root/qpid/java/common
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2012-05-07 22:40:52 +0000
committerRobert Godfrey <rgodfrey@apache.org>2012-05-07 22:40:52 +0000
commit9eab96a9a3569486f6351c94abf4f95ed515e9b1 (patch)
treeae86cedd9fdcea4f49993e5a82954ccda53a1ed3 /qpid/java/common
parent1427de0275b5db2c8619db9211435897123259d8 (diff)
downloadqpid-python-9eab96a9a3569486f6351c94abf4f95ed515e9b1.tar.gz
QPID-3986 : [Java Broker] Add producer flow control based on total disk usage
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1335290 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/common')
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java27
1 files changed, 19 insertions, 8 deletions
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
index 110c73f718..06b606f2d3 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
@@ -210,6 +210,17 @@ public class Session extends SessionInvoker
}
}
+ protected State getState()
+ {
+ return this.state;
+ }
+
+ public boolean isFlowControlled()
+ {
+ return flowControl;
+ }
+
+
void setFlowControl(boolean value)
{
flowControl = value;
@@ -307,7 +318,7 @@ public class Session extends SessionInvoker
xfr.setHeader(new Header(deliveryProps, header.getMessageProperties(),
header.getNonStandardProperties()));
}
-
+
}
else
{
@@ -616,7 +627,7 @@ public class Session extends SessionInvoker
{
acquireCredit();
}
-
+
synchronized (commandsLock)
{
if (state == DETACHED && m.isUnreliable())
@@ -732,11 +743,11 @@ public class Session extends SessionInvoker
{
sessionCommandPoint(0, 0);
}
-
+
boolean replayTransfer = !closing && !transacted &&
m instanceof MessageTransfer &&
! m.isUnreliable();
-
+
if ((replayTransfer) || m.hasCompletionListener())
{
setCommand(next, m);
@@ -833,7 +844,7 @@ public class Session extends SessionInvoker
Waiter w = new Waiter(commandsLock, timeout);
while (w.hasTime() && state != CLOSED && lt(maxComplete, point))
{
- checkFailoverRequired("Session sync was interrupted by failover.");
+ checkFailoverRequired("Session sync was interrupted by failover.");
if(log.isDebugEnabled())
{
log.debug("%s waiting for[%d]: %d, %s", this, point, maxComplete, commands);
@@ -871,7 +882,7 @@ public class Session extends SessionInvoker
{
future = results.remove(command);
}
-
+
if (future != null)
{
future.set(result);
@@ -1039,7 +1050,7 @@ public class Session extends SessionInvoker
}
}
- protected void awaitClose()
+ protected void awaitClose()
{
Waiter w = new Waiter(commandsLock, timeout);
while (w.hasTime() && state != CLOSED)
@@ -1096,7 +1107,7 @@ public class Session extends SessionInvoker
if(state == CLOSED)
{
- connection.removeSession(this);
+ connection.removeSession(this);
listener.closed(this);
}
}