diff options
| author | Rafael H. Schloming <rhs@apache.org> | 2007-10-05 15:58:39 +0000 |
|---|---|---|
| committer | Rafael H. Schloming <rhs@apache.org> | 2007-10-05 15:58:39 +0000 |
| commit | 7defea948eda5d75e2cb665b2c46ab9aecbb73e0 (patch) | |
| tree | caff3381308f7a23858b0338878d52dcad77b486 | |
| parent | cdb7bf84ff882828b0db6ff9aa47de73bc02a41f (diff) | |
| download | qpid-python-7defea948eda5d75e2cb665b2c46ab9aecbb73e0.tar.gz | |
don't wait for results if the session is closed, also added missing synchronization for flushProcessed(), and fixed a buggy log statement
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@582320 13f79535-47bb-0310-9956-ffa450edef68
| -rw-r--r-- | java/common/src/main/java/org/apache/qpidity/transport/Session.java | 48 |
1 files changed, 35 insertions, 13 deletions
diff --git a/java/common/src/main/java/org/apache/qpidity/transport/Session.java b/java/common/src/main/java/org/apache/qpidity/transport/Session.java index 06abe14278..609d029c6c 100644 --- a/java/common/src/main/java/org/apache/qpidity/transport/Session.java +++ b/java/common/src/main/java/org/apache/qpidity/transport/Session.java @@ -30,6 +30,7 @@ import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; /** @@ -57,7 +58,7 @@ public class Session extends Invoker private Map<Long,Method> commands = new HashMap<Long,Method>(); private long mark = 0; - private boolean closed = false; + private AtomicBoolean closed = new AtomicBoolean(false); public Map<Long,Method> getOutstandingCommands() @@ -122,16 +123,19 @@ public class Session extends Invoker long mark = -1; boolean first = true; RangeSet rest = new RangeSet(); - for (Range r: processed) + synchronized (processed) { - if (first) - { - first = false; - mark = r.getUpper(); - } - else + for (Range r: processed) { - rest.add(r); + if (first) + { + first = false; + mark = r.getUpper(); + } + else + { + rest.add(r); + } } } executionComplete(mark, rest); @@ -251,10 +255,10 @@ public class Session extends Invoker executionSync(); } - while (!closed && !commands.isEmpty()) + while (!closed.get() && !commands.isEmpty()) { try { - log.debug("%s waiting"); + log.debug("%s waiting", this); commands.wait(); } catch (InterruptedException e) @@ -318,10 +322,11 @@ public class Session extends Invoker { synchronized (this) { - while (!isDone()) + while (!closed.get() && !isDone()) { try { + log.debug("%s waiting for result: %s", Session.this, this); wait(timeout, nanos); } catch (InterruptedException e) @@ -331,6 +336,11 @@ public class Session extends Invoker } } + if (!isDone()) + { + throw new RuntimeException("session closed"); + } + return result; } @@ -349,6 +359,11 @@ public class Session extends Invoker return result != null; } + public String toString() + { + return String.format("Future(%s)", isDone() ? result : klass); + } + } public void close() @@ -359,11 +374,18 @@ public class Session extends Invoker public void closed() { + closed.set(true); synchronized (commands) { - closed = true; commands.notifyAll(); } + synchronized (results) + { + for (ResultFuture<?> result : results.values()) + { + result.notifyAll(); + } + } } } |
