summaryrefslogtreecommitdiff
path: root/java/common
diff options
context:
space:
mode:
authorRafael H. Schloming <rhs@apache.org>2007-08-08 22:32:03 +0000
committerRafael H. Schloming <rhs@apache.org>2007-08-08 22:32:03 +0000
commitb3eb629f662dc2df11ce3981a7c4cc8ab58a8c28 (patch)
treedff9da293fffbb2a4f2360bb6b0b6b01a9a220ef /java/common
parent92a4b76fcbedb01d79658e8f84c31c39f92d98e4 (diff)
downloadqpid-python-b3eb629f662dc2df11ce3981a7c4cc8ab58a8c28.tar.gz
fixed executionSync
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@564037 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/common')
-rw-r--r--java/common/src/main/java/org/apache/qpidity/ProtocolHeader.java7
-rw-r--r--java/common/src/main/java/org/apache/qpidity/RangeSet.java13
-rw-r--r--java/common/src/main/java/org/apache/qpidity/Session.java43
-rw-r--r--java/common/src/main/java/org/apache/qpidity/SessionDelegate.java7
4 files changed, 61 insertions, 9 deletions
diff --git a/java/common/src/main/java/org/apache/qpidity/ProtocolHeader.java b/java/common/src/main/java/org/apache/qpidity/ProtocolHeader.java
index 4e654f706b..56c73d1f00 100644
--- a/java/common/src/main/java/org/apache/qpidity/ProtocolHeader.java
+++ b/java/common/src/main/java/org/apache/qpidity/ProtocolHeader.java
@@ -33,6 +33,7 @@ class ProtocolHeader
{
private static final byte[] AMQP = {'A', 'M', 'Q', 'P' };
+ private static final byte CLASS = 1;
final private byte instance;
final private byte major;
@@ -64,9 +65,9 @@ class ProtocolHeader
{
ByteBuffer buf = ByteBuffer.allocate(8);
buf.put(AMQP);
- buf.put((byte) 1);
- buf.put((byte) 1);
- buf.put( major);
+ buf.put(CLASS);
+ buf.put(instance);
+ buf.put(major);
buf.put(minor);
buf.flip();
return buf;
diff --git a/java/common/src/main/java/org/apache/qpidity/RangeSet.java b/java/common/src/main/java/org/apache/qpidity/RangeSet.java
index 297df7c0c9..2e79ee8a72 100644
--- a/java/common/src/main/java/org/apache/qpidity/RangeSet.java
+++ b/java/common/src/main/java/org/apache/qpidity/RangeSet.java
@@ -47,6 +47,19 @@ public class RangeSet implements Iterable<Range>
return ranges.iterator();
}
+ public boolean includes(Range range)
+ {
+ for (Range r : this)
+ {
+ if (r.includes(range))
+ {
+ return true;
+ }
+ }
+
+ return false;
+ }
+
public void add(Range range)
{
ListIterator<Range> it = ranges.listIterator();
diff --git a/java/common/src/main/java/org/apache/qpidity/Session.java b/java/common/src/main/java/org/apache/qpidity/Session.java
index 067fc8c83c..c8b3c7c5bb 100644
--- a/java/common/src/main/java/org/apache/qpidity/Session.java
+++ b/java/common/src/main/java/org/apache/qpidity/Session.java
@@ -39,6 +39,7 @@ public class Session extends Invoker
private long commandsIn = 0;
// completed incoming commands
private final RangeSet processed = new RangeSet();
+ private Range syncPoint = null;
// outgoing command count
private long commandsOut = 0;
@@ -70,24 +71,56 @@ public class Session extends Invoker
return processed;
}
+ public void processed(Method command)
+ {
+ processed(command.getId());
+ }
+
public void processed(long command)
{
- processed.add(command);
+ processed(new Range(command, command));
}
public void processed(long lower, long upper)
{
- processed.add(lower, upper);
+ processed(new Range(lower, upper));
}
public void processed(Range range)
{
- processed.add(range);
+ boolean flush;
+ synchronized (processed)
+ {
+ processed.add(range);
+ flush = syncPoint != null && processed.includes(syncPoint);
+ }
+ if (flush)
+ {
+ flushProcessed();
+ }
}
- public void processed(Method command)
+ void flushProcessed()
{
- processed(command.getId());
+ executionComplete(0, processed);
+ }
+
+ void syncPoint()
+ {
+ Range range = new Range(0, getCommandsIn() - 1);
+ boolean flush;
+ synchronized (processed)
+ {
+ flush = processed.includes(range);
+ if (!flush)
+ {
+ syncPoint = range;
+ }
+ }
+ if (flush)
+ {
+ flushProcessed();
+ }
}
public void attach(Channel channel)
diff --git a/java/common/src/main/java/org/apache/qpidity/SessionDelegate.java b/java/common/src/main/java/org/apache/qpidity/SessionDelegate.java
index fd3e019367..5d62a57e93 100644
--- a/java/common/src/main/java/org/apache/qpidity/SessionDelegate.java
+++ b/java/common/src/main/java/org/apache/qpidity/SessionDelegate.java
@@ -48,9 +48,14 @@ public abstract class SessionDelegate extends Delegate<Session>
System.out.println("outstanding commands: " + ssn.getOutstandingCommands());
}
+ @Override public void executionFlush(Session ssn, ExecutionFlush flush)
+ {
+ ssn.flushProcessed();
+ }
+
@Override public void executionSync(Session ssn, ExecutionSync sync)
{
- ssn.executionComplete(0, ssn.getProcessed());
+ ssn.syncPoint();
}
}