summaryrefslogtreecommitdiff
path: root/qpid/java/common/src
diff options
context:
space:
mode:
authorRafael H. Schloming <rhs@apache.org>2008-06-03 19:12:03 +0000
committerRafael H. Schloming <rhs@apache.org>2008-06-03 19:12:03 +0000
commit3ceea332088f4ccf7a3767b1d0e7879c27adcca0 (patch)
treefb74f3c32243d59676ea68b17cb26387eebffe42 /qpid/java/common/src
parenta60d332b358f41205f1f862dc2de9d15b2802ded (diff)
downloadqpid-python-3ceea332088f4ccf7a3767b1d0e7879c27adcca0.tar.gz
QPID-901: honor the timely-reply flag and handle known-completed
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@662859 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/common/src')
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpidity/transport/Session.java34
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpidity/transport/SessionDelegate.java13
2 files changed, 38 insertions, 9 deletions
diff --git a/qpid/java/common/src/main/java/org/apache/qpidity/transport/Session.java b/qpid/java/common/src/main/java/org/apache/qpidity/transport/Session.java
index 4e82e893d7..a7de66c1a7 100644
--- a/qpid/java/common/src/main/java/org/apache/qpidity/transport/Session.java
+++ b/qpid/java/common/src/main/java/org/apache/qpidity/transport/Session.java
@@ -72,7 +72,8 @@ public class Session extends Invoker
// incoming command count
int commandsIn = 0;
// completed incoming commands
- private final RangeSet processed = new RangeSet();
+ private final Object processedLock = new Object();
+ private RangeSet processed = new RangeSet();
private Range syncPoint = null;
// outgoing command count
@@ -112,11 +113,6 @@ public class Session extends Invoker
return commandsIn++;
}
- public RangeSet getProcessed()
- {
- return processed;
- }
-
public void processed(Method command)
{
processed(command.getId());
@@ -138,7 +134,7 @@ public class Session extends Invoker
log.debug("%s processed(%s)", this, range);
boolean flush;
- synchronized (processed)
+ synchronized (processedLock)
{
processed.add(range);
flush = syncPoint != null && processed.includes(syncPoint);
@@ -152,20 +148,40 @@ public class Session extends Invoker
public void flushProcessed()
{
RangeSet copy;
- synchronized (processed)
+ synchronized (processedLock)
{
copy = processed.copy();
}
sessionCompleted(copy);
}
+ void knownComplete(RangeSet kc)
+ {
+ synchronized (processedLock)
+ {
+ RangeSet newProcessed = new RangeSet();
+ OUTER: for (Range r : processed)
+ {
+ for (Range kr : kc)
+ {
+ if (kr.includes(r))
+ {
+ continue OUTER;
+ }
+ }
+ newProcessed.add(r);
+ }
+ this.processed = newProcessed;
+ }
+ }
+
void syncPoint()
{
int id = getCommandsIn() - 1;
log.debug("%s synced to %d", this, id);
Range range = new Range(0, id - 1);
boolean flush;
- synchronized (processed)
+ synchronized (processedLock)
{
flush = processed.includes(range);
if (!flush)
diff --git a/qpid/java/common/src/main/java/org/apache/qpidity/transport/SessionDelegate.java b/qpid/java/common/src/main/java/org/apache/qpidity/transport/SessionDelegate.java
index 8d9b2e730e..deb3ee156d 100644
--- a/qpid/java/common/src/main/java/org/apache/qpidity/transport/SessionDelegate.java
+++ b/qpid/java/common/src/main/java/org/apache/qpidity/transport/SessionDelegate.java
@@ -74,6 +74,19 @@ public abstract class SessionDelegate
ssn.complete(range.getLower(), range.getUpper());
}
}
+ if (cmp.getTimelyReply())
+ {
+ ssn.sessionKnownCompleted(ranges);
+ }
+ }
+
+ @Override public void sessionKnownCompleted(Session ssn, SessionKnownCompleted kcmp)
+ {
+ RangeSet kc = kcmp.getCommands();
+ if (kc != null)
+ {
+ ssn.knownComplete(kc);
+ }
}
@Override public void sessionFlush(Session ssn, SessionFlush flush)