diff options
Diffstat (limited to 'java')
| -rw-r--r-- | java/common/src/main/java/org/apache/qpidity/transport/Session.java | 48 |
1 files changed, 35 insertions, 13 deletions
diff --git a/java/common/src/main/java/org/apache/qpidity/transport/Session.java b/java/common/src/main/java/org/apache/qpidity/transport/Session.java index 06abe14278..609d029c6c 100644 --- a/java/common/src/main/java/org/apache/qpidity/transport/Session.java +++ b/java/common/src/main/java/org/apache/qpidity/transport/Session.java @@ -30,6 +30,7 @@ import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; /** @@ -57,7 +58,7 @@ public class Session extends Invoker private Map<Long,Method> commands = new HashMap<Long,Method>(); private long mark = 0; - private boolean closed = false; + private AtomicBoolean closed = new AtomicBoolean(false); public Map<Long,Method> getOutstandingCommands() @@ -122,16 +123,19 @@ public class Session extends Invoker long mark = -1; boolean first = true; RangeSet rest = new RangeSet(); - for (Range r: processed) + synchronized (processed) { - if (first) - { - first = false; - mark = r.getUpper(); - } - else + for (Range r: processed) { - rest.add(r); + if (first) + { + first = false; + mark = r.getUpper(); + } + else + { + rest.add(r); + } } } executionComplete(mark, rest); @@ -251,10 +255,10 @@ public class Session extends Invoker executionSync(); } - while (!closed && !commands.isEmpty()) + while (!closed.get() && !commands.isEmpty()) { try { - log.debug("%s waiting"); + log.debug("%s waiting", this); commands.wait(); } catch (InterruptedException e) @@ -318,10 +322,11 @@ public class Session extends Invoker { synchronized (this) { - while (!isDone()) + while (!closed.get() && !isDone()) { try { + log.debug("%s waiting for result: %s", Session.this, this); wait(timeout, nanos); } catch (InterruptedException e) @@ -331,6 +336,11 @@ public class Session extends Invoker } } + if (!isDone()) + { + throw new RuntimeException("session closed"); + } + return result; } @@ -349,6 +359,11 @@ public class Session extends Invoker return result != null; } + public String toString() + { + return String.format("Future(%s)", isDone() ? result : klass); + } + } public void close() @@ -359,11 +374,18 @@ public class Session extends Invoker public void closed() { + closed.set(true); synchronized (commands) { - closed = true; commands.notifyAll(); } + synchronized (results) + { + for (ResultFuture<?> result : results.values()) + { + result.notifyAll(); + } + } } } |
