summaryrefslogtreecommitdiff
path: root/java/common/src
diff options
context:
space:
mode:
authorRafael H. Schloming <rhs@apache.org>2007-10-10 18:19:20 +0000
committerRafael H. Schloming <rhs@apache.org>2007-10-10 18:19:20 +0000
commitea6fda30b82c1f16a5be4157a6978348cb187a94 (patch)
tree0e2829f1a8157916f855d6050ae1009a55e6081c /java/common/src
parent451f197c121b0124cd8171a9e0b37fe1a61cd3ed (diff)
downloadqpid-python-ea6fda30b82c1f16a5be4157a6978348cb187a94.tar.gz
made the session usable from multiple threads (hopefully)
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@583567 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/common/src')
-rw-r--r--java/common/src/main/java/org/apache/qpidity/transport/Session.java49
1 files changed, 31 insertions, 18 deletions
diff --git a/java/common/src/main/java/org/apache/qpidity/transport/Session.java b/java/common/src/main/java/org/apache/qpidity/transport/Session.java
index 609d029c6c..73da0ddf4d 100644
--- a/java/common/src/main/java/org/apache/qpidity/transport/Session.java
+++ b/java/common/src/main/java/org/apache/qpidity/transport/Session.java
@@ -177,6 +177,8 @@ public class Session extends Invoker
void complete(long lower, long upper)
{
+ log.debug("%s complete(%d, %d)", this, lower, upper);
+
synchronized (commands)
{
for (long id = lower; id <= upper; id++)
@@ -184,18 +186,19 @@ public class Session extends Invoker
commands.remove(id);
}
- if (commands.isEmpty())
- {
- log.debug("%s no outsanding commands", this);
- commands.notifyAll();
- }
+ commands.notifyAll();
+ log.debug("%s commands remaining: %s", this, commands);
}
}
void complete(long mark)
{
- complete(this.mark, mark);
- this.mark = mark;
+ synchronized (commands)
+ {
+ complete(this.mark, mark);
+ this.mark = mark;
+ commands.notifyAll();
+ }
}
protected void invoke(Method m)
@@ -205,9 +208,13 @@ public class Session extends Invoker
synchronized (commands)
{
commands.put(commandsOut++, m);
+ channel.method(m);
}
}
- channel.method(m);
+ else
+ {
+ channel.method(m);
+ }
}
public void header(Header header)
@@ -250,15 +257,17 @@ public class Session extends Invoker
log.debug("%s sync()", this);
synchronized (commands)
{
- if (!commands.isEmpty())
+ long point = commandsOut - 1;
+
+ if (mark < point)
{
executionSync();
}
- while (!closed.get() && !commands.isEmpty())
+ while (!closed.get() && mark < point)
{
try {
- log.debug("%s waiting", this);
+ log.debug("%s waiting for[%d]: %s", this, point, commands);
commands.wait();
}
catch (InterruptedException e)
@@ -267,7 +276,7 @@ public class Session extends Invoker
}
}
- if (!commands.isEmpty())
+ if (mark < point)
{
throw new RuntimeException("session closed");
}
@@ -286,16 +295,20 @@ public class Session extends Invoker
}
future.set(result);
}
+
protected <T> Future<T> invoke(Method m, Class<T> klass)
{
- long command = commandsOut;
- ResultFuture<T> future = new ResultFuture<T>(klass);
- synchronized (results)
+ synchronized (commands)
{
- results.put(command, future);
+ long command = commandsOut;
+ ResultFuture<T> future = new ResultFuture<T>(klass);
+ synchronized (results)
+ {
+ results.put(command, future);
+ }
+ invoke(m);
+ return future;
}
- invoke(m);
- return future;
}
private class ResultFuture<T> implements Future<T>