diff options
| author | Rafael H. Schloming <rhs@apache.org> | 2007-08-08 22:32:03 +0000 |
|---|---|---|
| committer | Rafael H. Schloming <rhs@apache.org> | 2007-08-08 22:32:03 +0000 |
| commit | b3eb629f662dc2df11ce3981a7c4cc8ab58a8c28 (patch) | |
| tree | dff9da293fffbb2a4f2360bb6b0b6b01a9a220ef /java | |
| parent | 92a4b76fcbedb01d79658e8f84c31c39f92d98e4 (diff) | |
| download | qpid-python-b3eb629f662dc2df11ce3981a7c4cc8ab58a8c28.tar.gz | |
fixed executionSync
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@564037 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
4 files changed, 61 insertions, 9 deletions
diff --git a/java/common/src/main/java/org/apache/qpidity/ProtocolHeader.java b/java/common/src/main/java/org/apache/qpidity/ProtocolHeader.java index 4e654f706b..56c73d1f00 100644 --- a/java/common/src/main/java/org/apache/qpidity/ProtocolHeader.java +++ b/java/common/src/main/java/org/apache/qpidity/ProtocolHeader.java @@ -33,6 +33,7 @@ class ProtocolHeader { private static final byte[] AMQP = {'A', 'M', 'Q', 'P' }; + private static final byte CLASS = 1; final private byte instance; final private byte major; @@ -64,9 +65,9 @@ class ProtocolHeader { ByteBuffer buf = ByteBuffer.allocate(8); buf.put(AMQP); - buf.put((byte) 1); - buf.put((byte) 1); - buf.put( major); + buf.put(CLASS); + buf.put(instance); + buf.put(major); buf.put(minor); buf.flip(); return buf; diff --git a/java/common/src/main/java/org/apache/qpidity/RangeSet.java b/java/common/src/main/java/org/apache/qpidity/RangeSet.java index 297df7c0c9..2e79ee8a72 100644 --- a/java/common/src/main/java/org/apache/qpidity/RangeSet.java +++ b/java/common/src/main/java/org/apache/qpidity/RangeSet.java @@ -47,6 +47,19 @@ public class RangeSet implements Iterable<Range> return ranges.iterator(); } + public boolean includes(Range range) + { + for (Range r : this) + { + if (r.includes(range)) + { + return true; + } + } + + return false; + } + public void add(Range range) { ListIterator<Range> it = ranges.listIterator(); diff --git a/java/common/src/main/java/org/apache/qpidity/Session.java b/java/common/src/main/java/org/apache/qpidity/Session.java index 067fc8c83c..c8b3c7c5bb 100644 --- a/java/common/src/main/java/org/apache/qpidity/Session.java +++ b/java/common/src/main/java/org/apache/qpidity/Session.java @@ -39,6 +39,7 @@ public class Session extends Invoker private long commandsIn = 0; // completed incoming commands private final RangeSet processed = new RangeSet(); + private Range syncPoint = null; // outgoing command count private long commandsOut = 0; @@ -70,24 +71,56 @@ public class Session extends Invoker return processed; } + public void processed(Method command) + { + processed(command.getId()); + } + public void processed(long command) { - processed.add(command); + processed(new Range(command, command)); } public void processed(long lower, long upper) { - processed.add(lower, upper); + processed(new Range(lower, upper)); } public void processed(Range range) { - processed.add(range); + boolean flush; + synchronized (processed) + { + processed.add(range); + flush = syncPoint != null && processed.includes(syncPoint); + } + if (flush) + { + flushProcessed(); + } } - public void processed(Method command) + void flushProcessed() { - processed(command.getId()); + executionComplete(0, processed); + } + + void syncPoint() + { + Range range = new Range(0, getCommandsIn() - 1); + boolean flush; + synchronized (processed) + { + flush = processed.includes(range); + if (!flush) + { + syncPoint = range; + } + } + if (flush) + { + flushProcessed(); + } } public void attach(Channel channel) diff --git a/java/common/src/main/java/org/apache/qpidity/SessionDelegate.java b/java/common/src/main/java/org/apache/qpidity/SessionDelegate.java index fd3e019367..5d62a57e93 100644 --- a/java/common/src/main/java/org/apache/qpidity/SessionDelegate.java +++ b/java/common/src/main/java/org/apache/qpidity/SessionDelegate.java @@ -48,9 +48,14 @@ public abstract class SessionDelegate extends Delegate<Session> System.out.println("outstanding commands: " + ssn.getOutstandingCommands()); } + @Override public void executionFlush(Session ssn, ExecutionFlush flush) + { + ssn.flushProcessed(); + } + @Override public void executionSync(Session ssn, ExecutionSync sync) { - ssn.executionComplete(0, ssn.getProcessed()); + ssn.syncPoint(); } } |
