summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRafael H. Schloming <rhs@apache.org>2007-10-05 15:58:39 +0000
committerRafael H. Schloming <rhs@apache.org>2007-10-05 15:58:39 +0000
commit7defea948eda5d75e2cb665b2c46ab9aecbb73e0 (patch)
treecaff3381308f7a23858b0338878d52dcad77b486
parentcdb7bf84ff882828b0db6ff9aa47de73bc02a41f (diff)
downloadqpid-python-7defea948eda5d75e2cb665b2c46ab9aecbb73e0.tar.gz
don't wait for results if the session is closed, also added missing synchronization for flushProcessed(), and fixed a buggy log statement
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@582320 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/common/src/main/java/org/apache/qpidity/transport/Session.java48
1 files changed, 35 insertions, 13 deletions
diff --git a/java/common/src/main/java/org/apache/qpidity/transport/Session.java b/java/common/src/main/java/org/apache/qpidity/transport/Session.java
index 06abe14278..609d029c6c 100644
--- a/java/common/src/main/java/org/apache/qpidity/transport/Session.java
+++ b/java/common/src/main/java/org/apache/qpidity/transport/Session.java
@@ -30,6 +30,7 @@ import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
/**
@@ -57,7 +58,7 @@ public class Session extends Invoker
private Map<Long,Method> commands = new HashMap<Long,Method>();
private long mark = 0;
- private boolean closed = false;
+ private AtomicBoolean closed = new AtomicBoolean(false);
public Map<Long,Method> getOutstandingCommands()
@@ -122,16 +123,19 @@ public class Session extends Invoker
long mark = -1;
boolean first = true;
RangeSet rest = new RangeSet();
- for (Range r: processed)
+ synchronized (processed)
{
- if (first)
- {
- first = false;
- mark = r.getUpper();
- }
- else
+ for (Range r: processed)
{
- rest.add(r);
+ if (first)
+ {
+ first = false;
+ mark = r.getUpper();
+ }
+ else
+ {
+ rest.add(r);
+ }
}
}
executionComplete(mark, rest);
@@ -251,10 +255,10 @@ public class Session extends Invoker
executionSync();
}
- while (!closed && !commands.isEmpty())
+ while (!closed.get() && !commands.isEmpty())
{
try {
- log.debug("%s waiting");
+ log.debug("%s waiting", this);
commands.wait();
}
catch (InterruptedException e)
@@ -318,10 +322,11 @@ public class Session extends Invoker
{
synchronized (this)
{
- while (!isDone())
+ while (!closed.get() && !isDone())
{
try
{
+ log.debug("%s waiting for result: %s", Session.this, this);
wait(timeout, nanos);
}
catch (InterruptedException e)
@@ -331,6 +336,11 @@ public class Session extends Invoker
}
}
+ if (!isDone())
+ {
+ throw new RuntimeException("session closed");
+ }
+
return result;
}
@@ -349,6 +359,11 @@ public class Session extends Invoker
return result != null;
}
+ public String toString()
+ {
+ return String.format("Future(%s)", isDone() ? result : klass);
+ }
+
}
public void close()
@@ -359,11 +374,18 @@ public class Session extends Invoker
public void closed()
{
+ closed.set(true);
synchronized (commands)
{
- closed = true;
commands.notifyAll();
}
+ synchronized (results)
+ {
+ for (ResultFuture<?> result : results.values())
+ {
+ result.notifyAll();
+ }
+ }
}
}