summaryrefslogtreecommitdiff
path: root/qpid/java/common
diff options
context:
space:
mode:
authorRafael H. Schloming <rhs@apache.org>2007-10-10 18:19:20 +0000
committerRafael H. Schloming <rhs@apache.org>2007-10-10 18:19:20 +0000
commitfd81ba29a5d1d27eb9c48684206a71a63fa77e57 (patch)
tree56fe811a7319806cf8d683b7e66d9f53f02a2ca9 /qpid/java/common
parentc27ca618cf6fe22b0ab6516fddb8529d58024ea9 (diff)
downloadqpid-python-fd81ba29a5d1d27eb9c48684206a71a63fa77e57.tar.gz
made the session usable from multiple threads (hopefully)
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@583567 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/common')
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpidity/transport/Session.java49
1 files changed, 31 insertions, 18 deletions
diff --git a/qpid/java/common/src/main/java/org/apache/qpidity/transport/Session.java b/qpid/java/common/src/main/java/org/apache/qpidity/transport/Session.java
index 609d029c6c..73da0ddf4d 100644
--- a/qpid/java/common/src/main/java/org/apache/qpidity/transport/Session.java
+++ b/qpid/java/common/src/main/java/org/apache/qpidity/transport/Session.java
@@ -177,6 +177,8 @@ public class Session extends Invoker
void complete(long lower, long upper)
{
+ log.debug("%s complete(%d, %d)", this, lower, upper);
+
synchronized (commands)
{
for (long id = lower; id <= upper; id++)
@@ -184,18 +186,19 @@ public class Session extends Invoker
commands.remove(id);
}
- if (commands.isEmpty())
- {
- log.debug("%s no outsanding commands", this);
- commands.notifyAll();
- }
+ commands.notifyAll();
+ log.debug("%s commands remaining: %s", this, commands);
}
}
void complete(long mark)
{
- complete(this.mark, mark);
- this.mark = mark;
+ synchronized (commands)
+ {
+ complete(this.mark, mark);
+ this.mark = mark;
+ commands.notifyAll();
+ }
}
protected void invoke(Method m)
@@ -205,9 +208,13 @@ public class Session extends Invoker
synchronized (commands)
{
commands.put(commandsOut++, m);
+ channel.method(m);
}
}
- channel.method(m);
+ else
+ {
+ channel.method(m);
+ }
}
public void header(Header header)
@@ -250,15 +257,17 @@ public class Session extends Invoker
log.debug("%s sync()", this);
synchronized (commands)
{
- if (!commands.isEmpty())
+ long point = commandsOut - 1;
+
+ if (mark < point)
{
executionSync();
}
- while (!closed.get() && !commands.isEmpty())
+ while (!closed.get() && mark < point)
{
try {
- log.debug("%s waiting", this);
+ log.debug("%s waiting for[%d]: %s", this, point, commands);
commands.wait();
}
catch (InterruptedException e)
@@ -267,7 +276,7 @@ public class Session extends Invoker
}
}
- if (!commands.isEmpty())
+ if (mark < point)
{
throw new RuntimeException("session closed");
}
@@ -286,16 +295,20 @@ public class Session extends Invoker
}
future.set(result);
}
+
protected <T> Future<T> invoke(Method m, Class<T> klass)
{
- long command = commandsOut;
- ResultFuture<T> future = new ResultFuture<T>(klass);
- synchronized (results)
+ synchronized (commands)
{
- results.put(command, future);
+ long command = commandsOut;
+ ResultFuture<T> future = new ResultFuture<T>(klass);
+ synchronized (results)
+ {
+ results.put(command, future);
+ }
+ invoke(m);
+ return future;
}
- invoke(m);
- return future;
}
private class ResultFuture<T> implements Future<T>