diff options
| author | Rafael H. Schloming <rhs@apache.org> | 2008-06-03 19:12:03 +0000 |
|---|---|---|
| committer | Rafael H. Schloming <rhs@apache.org> | 2008-06-03 19:12:03 +0000 |
| commit | 3ceea332088f4ccf7a3767b1d0e7879c27adcca0 (patch) | |
| tree | fb74f3c32243d59676ea68b17cb26387eebffe42 /qpid/java/common/src | |
| parent | a60d332b358f41205f1f862dc2de9d15b2802ded (diff) | |
| download | qpid-python-3ceea332088f4ccf7a3767b1d0e7879c27adcca0.tar.gz | |
QPID-901: honor the timely-reply flag and handle known-completed
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@662859 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/common/src')
| -rw-r--r-- | qpid/java/common/src/main/java/org/apache/qpidity/transport/Session.java | 34 | ||||
| -rw-r--r-- | qpid/java/common/src/main/java/org/apache/qpidity/transport/SessionDelegate.java | 13 |
2 files changed, 38 insertions, 9 deletions
diff --git a/qpid/java/common/src/main/java/org/apache/qpidity/transport/Session.java b/qpid/java/common/src/main/java/org/apache/qpidity/transport/Session.java index 4e82e893d7..a7de66c1a7 100644 --- a/qpid/java/common/src/main/java/org/apache/qpidity/transport/Session.java +++ b/qpid/java/common/src/main/java/org/apache/qpidity/transport/Session.java @@ -72,7 +72,8 @@ public class Session extends Invoker // incoming command count int commandsIn = 0; // completed incoming commands - private final RangeSet processed = new RangeSet(); + private final Object processedLock = new Object(); + private RangeSet processed = new RangeSet(); private Range syncPoint = null; // outgoing command count @@ -112,11 +113,6 @@ public class Session extends Invoker return commandsIn++; } - public RangeSet getProcessed() - { - return processed; - } - public void processed(Method command) { processed(command.getId()); @@ -138,7 +134,7 @@ public class Session extends Invoker log.debug("%s processed(%s)", this, range); boolean flush; - synchronized (processed) + synchronized (processedLock) { processed.add(range); flush = syncPoint != null && processed.includes(syncPoint); @@ -152,20 +148,40 @@ public class Session extends Invoker public void flushProcessed() { RangeSet copy; - synchronized (processed) + synchronized (processedLock) { copy = processed.copy(); } sessionCompleted(copy); } + void knownComplete(RangeSet kc) + { + synchronized (processedLock) + { + RangeSet newProcessed = new RangeSet(); + OUTER: for (Range r : processed) + { + for (Range kr : kc) + { + if (kr.includes(r)) + { + continue OUTER; + } + } + newProcessed.add(r); + } + this.processed = newProcessed; + } + } + void syncPoint() { int id = getCommandsIn() - 1; log.debug("%s synced to %d", this, id); Range range = new Range(0, id - 1); boolean flush; - synchronized (processed) + synchronized (processedLock) { flush = processed.includes(range); if (!flush) diff --git a/qpid/java/common/src/main/java/org/apache/qpidity/transport/SessionDelegate.java b/qpid/java/common/src/main/java/org/apache/qpidity/transport/SessionDelegate.java index 8d9b2e730e..deb3ee156d 100644 --- a/qpid/java/common/src/main/java/org/apache/qpidity/transport/SessionDelegate.java +++ b/qpid/java/common/src/main/java/org/apache/qpidity/transport/SessionDelegate.java @@ -74,6 +74,19 @@ public abstract class SessionDelegate ssn.complete(range.getLower(), range.getUpper()); } } + if (cmp.getTimelyReply()) + { + ssn.sessionKnownCompleted(ranges); + } + } + + @Override public void sessionKnownCompleted(Session ssn, SessionKnownCompleted kcmp) + { + RangeSet kc = kcmp.getCommands(); + if (kc != null) + { + ssn.knownComplete(kc); + } } @Override public void sessionFlush(Session ssn, SessionFlush flush) |
