summaryrefslogtreecommitdiff
path: root/java/common/src
diff options
context:
space:
mode:
authorRafael H. Schloming <rhs@apache.org>2007-08-08 04:36:31 +0000
committerRafael H. Schloming <rhs@apache.org>2007-08-08 04:36:31 +0000
commitdff8b876caa9625dc0e03e03dadad22a504826d1 (patch)
tree6afa9f8388c40603fc0423e96e2555a9455b83d6 /java/common/src
parenta45694048d1f26e0ed317f661b464bae862fb8fa (diff)
downloadqpid-python-dff8b876caa9625dc0e03e03dadad22a504826d1.tar.gz
implemented Session.sync()
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@563738 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/common/src')
-rw-r--r--java/common/src/main/java/org/apache/qpidity/AbstractDecoder.java6
-rw-r--r--java/common/src/main/java/org/apache/qpidity/AbstractEncoder.java6
-rw-r--r--java/common/src/main/java/org/apache/qpidity/Decoder.java2
-rw-r--r--java/common/src/main/java/org/apache/qpidity/Encoder.java2
-rw-r--r--java/common/src/main/java/org/apache/qpidity/Range.java46
-rw-r--r--java/common/src/main/java/org/apache/qpidity/RangeSet.java110
-rw-r--r--java/common/src/main/java/org/apache/qpidity/Session.java78
-rw-r--r--java/common/src/main/java/org/apache/qpidity/SessionDelegate.java9
-rw-r--r--java/common/src/main/java/org/apache/qpidity/ToyBroker.java6
-rw-r--r--java/common/src/main/java/org/apache/qpidity/ToyClient.java4
10 files changed, 242 insertions, 27 deletions
diff --git a/java/common/src/main/java/org/apache/qpidity/AbstractDecoder.java b/java/common/src/main/java/org/apache/qpidity/AbstractDecoder.java
index 9db419537c..ca728812bd 100644
--- a/java/common/src/main/java/org/apache/qpidity/AbstractDecoder.java
+++ b/java/common/src/main/java/org/apache/qpidity/AbstractDecoder.java
@@ -143,7 +143,7 @@ abstract class AbstractDecoder implements Decoder
return null;
}
- public Range<Long>[] readRfc1982LongSet()
+ public RangeSet readRfc1982LongSet()
{
int count = readShort()/8;
if (count == 0)
@@ -152,10 +152,10 @@ abstract class AbstractDecoder implements Decoder
}
else
{
- Range<Long>[] ranges = new Range[count];
+ RangeSet ranges = new RangeSet();
for (int i = 0; i < count; i++)
{
- ranges[i] = new Range<Long>(readLong(), readLong());
+ ranges.add(readLong(), readLong());
}
return ranges;
}
diff --git a/java/common/src/main/java/org/apache/qpidity/AbstractEncoder.java b/java/common/src/main/java/org/apache/qpidity/AbstractEncoder.java
index f898d759d5..ecd274615b 100644
--- a/java/common/src/main/java/org/apache/qpidity/AbstractEncoder.java
+++ b/java/common/src/main/java/org/apache/qpidity/AbstractEncoder.java
@@ -150,7 +150,7 @@ abstract class AbstractEncoder implements Encoder
//throw new Error("TODO");
}
- public void writeRfc1982LongSet(Range<Long>[] ranges)
+ public void writeRfc1982LongSet(RangeSet ranges)
{
if (ranges == null)
{
@@ -158,8 +158,8 @@ abstract class AbstractEncoder implements Encoder
}
else
{
- writeShort(ranges.length * 8);
- for (Range<Long> range : ranges)
+ writeShort(ranges.size() * 8);
+ for (Range range : ranges)
{
writeLong(range.getLower());
writeLong(range.getUpper());
diff --git a/java/common/src/main/java/org/apache/qpidity/Decoder.java b/java/common/src/main/java/org/apache/qpidity/Decoder.java
index 0a869baab7..13c3d0b7b8 100644
--- a/java/common/src/main/java/org/apache/qpidity/Decoder.java
+++ b/java/common/src/main/java/org/apache/qpidity/Decoder.java
@@ -44,7 +44,7 @@ public interface Decoder
String readLongstr();
Map<String,?> readTable();
- Range<Long>[] readRfc1982LongSet();
+ RangeSet readRfc1982LongSet();
UUID readUuid();
String readContent();
diff --git a/java/common/src/main/java/org/apache/qpidity/Encoder.java b/java/common/src/main/java/org/apache/qpidity/Encoder.java
index c25b96e462..990cef7081 100644
--- a/java/common/src/main/java/org/apache/qpidity/Encoder.java
+++ b/java/common/src/main/java/org/apache/qpidity/Encoder.java
@@ -44,7 +44,7 @@ public interface Encoder
void writeLongstr(String s);
void writeTable(Map<String,?> table);
- void writeRfc1982LongSet(Range<Long>[] ranges);
+ void writeRfc1982LongSet(RangeSet ranges);
void writeUuid(UUID uuid);
void writeContent(String c);
diff --git a/java/common/src/main/java/org/apache/qpidity/Range.java b/java/common/src/main/java/org/apache/qpidity/Range.java
index 4766ecb471..9da7112a6d 100644
--- a/java/common/src/main/java/org/apache/qpidity/Range.java
+++ b/java/common/src/main/java/org/apache/qpidity/Range.java
@@ -20,6 +20,8 @@
*/
package org.apache.qpidity;
+import static java.lang.Math.*;
+
/**
* Range
@@ -27,25 +29,57 @@ package org.apache.qpidity;
* @author Rafael H. Schloming
*/
-public class Range<C extends Comparable>
+public class Range
{
- private final C lower;
- private final C upper;
+ private final long lower;
+ private final long upper;
- public Range(C lower, C upper)
+ public Range(long lower, long upper)
{
this.lower = lower;
this.upper = upper;
}
- public C getLower()
+ public long getLower()
{
return lower;
}
- public C getUpper()
+ public long getUpper()
{
return upper;
}
+ public boolean includes(long value)
+ {
+ return lower <= value && value <= upper;
+ }
+
+ public boolean includes(Range range)
+ {
+ return includes(range.lower) && includes(range.upper);
+ }
+
+ public boolean intersects(Range range)
+ {
+ return (includes(range.lower) || includes(range.upper) ||
+ range.includes(lower) || range.includes(upper));
+ }
+
+ public boolean touches(Range range)
+ {
+ return (includes(range.upper + 1) || includes(range.lower - 1) ||
+ range.includes(upper + 1) || range.includes(lower - 1));
+ }
+
+ public Range span(Range range)
+ {
+ return new Range(min(lower, range.lower), max(upper, range.upper));
+ }
+
+ public String toString()
+ {
+ return "[" + lower + ", " + upper + "]";
+ }
+
}
diff --git a/java/common/src/main/java/org/apache/qpidity/RangeSet.java b/java/common/src/main/java/org/apache/qpidity/RangeSet.java
new file mode 100644
index 0000000000..297df7c0c9
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpidity/RangeSet.java
@@ -0,0 +1,110 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpidity;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.ListIterator;
+import java.util.LinkedList;
+
+
+/**
+ * RangeSet
+ *
+ * @author Rafael H. Schloming
+ */
+
+public class RangeSet implements Iterable<Range>
+{
+
+ private LinkedList<Range> ranges = new LinkedList<Range>();
+
+ public int size()
+ {
+ return ranges.size();
+ }
+
+ public Iterator<Range> iterator()
+ {
+ return ranges.iterator();
+ }
+
+ public void add(Range range)
+ {
+ ListIterator<Range> it = ranges.listIterator();
+
+ while (it.hasNext())
+ {
+ Range next = it.next();
+ if (range.touches(next))
+ {
+ it.remove();
+ range = range.span(next);
+ }
+ else if (range.getUpper() < next.getLower())
+ {
+ it.previous();
+ it.add(range);
+ return;
+ }
+ }
+
+ it.add(range);
+ }
+
+ public void add(long lower, long upper)
+ {
+ add(new Range(lower, upper));
+ }
+
+ public void add(long value)
+ {
+ add(value, value);
+ }
+
+ public void clear()
+ {
+ ranges.clear();
+ }
+
+ public String toString()
+ {
+ return ranges.toString();
+ }
+
+ public static final void main(String[] args)
+ {
+ RangeSet ranges = new RangeSet();
+ ranges.add(5, 10);
+ System.out.println(ranges);
+ ranges.add(15, 20);
+ System.out.println(ranges);
+ ranges.add(23, 25);
+ System.out.println(ranges);
+ ranges.add(12, 14);
+ System.out.println(ranges);
+ ranges.add(0, 1);
+ System.out.println(ranges);
+ ranges.add(3, 11);
+ System.out.println(ranges);
+ }
+
+}
diff --git a/java/common/src/main/java/org/apache/qpidity/Session.java b/java/common/src/main/java/org/apache/qpidity/Session.java
index b3116570e8..0c94c6df4e 100644
--- a/java/common/src/main/java/org/apache/qpidity/Session.java
+++ b/java/common/src/main/java/org/apache/qpidity/Session.java
@@ -37,11 +37,15 @@ public class Session extends Invoker
// channel may be null
Channel channel;
- // outgoing command count
- private long commandsOut = 0;
+
// XXX: incoming command count not used
// incoming command count
private long commandsIn = 0;
+ // completed incoming commands
+ private final RangeSet processed = new RangeSet();
+
+ // outgoing command count
+ private long commandsOut = 0;
private Map<Long,Method> commands = new HashMap<Long,Method>();
private long mark = 0;
@@ -55,6 +59,31 @@ public class Session extends Invoker
return commandsIn;
}
+ public RangeSet getProcessed()
+ {
+ return processed;
+ }
+
+ public void processed(long command)
+ {
+ processed.add(command);
+ }
+
+ public void processed(long lower, long upper)
+ {
+ processed.add(lower, upper);
+ }
+
+ public void processed(Range range)
+ {
+ processed.add(range);
+ }
+
+ public void processed(Struct command)
+ {
+ processed(command.getId());
+ }
+
public void attach(Channel channel)
{
this.channel = channel;
@@ -63,15 +92,24 @@ public class Session extends Invoker
public Method getCommand(long id)
{
- System.out.println(id + " " + commands);
- return commands.get(id);
+ synchronized (commands)
+ {
+ return commands.get(id);
+ }
}
void complete(long lower, long upper)
{
- for (long id = lower; id <= upper; id++)
+ synchronized (commands)
{
- commands.put(id, null);
+ for (long id = lower; id <= upper; id++)
+ {
+ commands.remove(id);
+ }
+ if (commands.isEmpty())
+ {
+ commands.notifyAll();
+ }
}
}
@@ -85,8 +123,10 @@ public class Session extends Invoker
{
if (m.getEncodedTrack() == Frame.L4)
{
- long cmd = commandsOut++;
- commands.put(cmd, m);
+ synchronized (commands)
+ {
+ commands.put(commandsOut++, m);
+ }
}
channel.method(m);
}
@@ -116,6 +156,28 @@ public class Session extends Invoker
channel.end();
}
+ public void sync()
+ {
+ synchronized (commands)
+ {
+ if (!commands.isEmpty())
+ {
+ executionSync();
+ }
+
+ while (!commands.isEmpty())
+ {
+ try {
+ commands.wait();
+ }
+ catch (InterruptedException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+ }
+
protected void invoke(Method m, Handler<Struct> handler)
{
throw new UnsupportedOperationException();
diff --git a/java/common/src/main/java/org/apache/qpidity/SessionDelegate.java b/java/common/src/main/java/org/apache/qpidity/SessionDelegate.java
index c2fbe6ba00..1275378b7e 100644
--- a/java/common/src/main/java/org/apache/qpidity/SessionDelegate.java
+++ b/java/common/src/main/java/org/apache/qpidity/SessionDelegate.java
@@ -36,10 +36,10 @@ public abstract class SessionDelegate extends Delegate<Session>
@Override public void executionComplete(Session ssn, ExecutionComplete excmp)
{
- Range<Long>[] ranges = excmp.getRangedExecutionSet();
+ RangeSet ranges = excmp.getRangedExecutionSet();
if (ranges != null)
{
- for (Range<Long> range : ranges)
+ for (Range range : ranges)
{
ssn.complete(range.getLower(), range.getUpper());
}
@@ -47,4 +47,9 @@ public abstract class SessionDelegate extends Delegate<Session>
ssn.complete(excmp.getCumulativeExecutionMark());
}
+ @Override public void executionSync(Session ssn, ExecutionSync sync)
+ {
+ ssn.executionComplete(0, ssn.getProcessed());
+ }
+
}
diff --git a/java/common/src/main/java/org/apache/qpidity/ToyBroker.java b/java/common/src/main/java/org/apache/qpidity/ToyBroker.java
index 534f075dbd..8c8c2c890d 100644
--- a/java/common/src/main/java/org/apache/qpidity/ToyBroker.java
+++ b/java/common/src/main/java/org/apache/qpidity/ToyBroker.java
@@ -58,6 +58,7 @@ class ToyBroker extends SessionDelegate
{
queues.put(qd.getQueue(), new LinkedList());
System.out.println("declared queue: " + qd.getQueue());
+ ssn.processed(qd);
}
@Override public void messageTransfer(Session ssn, MessageTransfer xfr)
@@ -120,6 +121,7 @@ class ToyBroker extends SessionDelegate
queue.offer(m);
System.out.println("queued " + m);
}
+ ssn.processed(xfr);
xfr = null;
frames = null;
}
@@ -133,8 +135,8 @@ class ToyBroker extends SessionDelegate
}
else
{
- long id = xfr.getId();
- Range[] ranges = {new Range<Long>(id, id)};
+ RangeSet ranges = new RangeSet();
+ ranges.add(xfr.getId());
ssn.messageReject(ranges, 0, "no such destination");
}
}
diff --git a/java/common/src/main/java/org/apache/qpidity/ToyClient.java b/java/common/src/main/java/org/apache/qpidity/ToyClient.java
index 1e57de3265..db03d30605 100644
--- a/java/common/src/main/java/org/apache/qpidity/ToyClient.java
+++ b/java/common/src/main/java/org/apache/qpidity/ToyClient.java
@@ -32,7 +32,7 @@ class ToyClient extends SessionDelegate
@Override public void messageReject(Session ssn, MessageReject reject)
{
- for (Range<Long> range : reject.getTransfers())
+ for (Range range : reject.getTransfers())
{
for (long l = range.getLower(); l <= range.getUpper(); l++)
{
@@ -40,6 +40,7 @@ class ToyClient extends SessionDelegate
ssn.getCommand((int) l));
}
}
+ ssn.processed(reject);
}
public void headers(Session ssn, Struct ... headers)
@@ -73,6 +74,7 @@ class ToyClient extends SessionDelegate
ssn.sessionOpen(1234);
ssn.queueDeclare("asdf", null, null);
+ ssn.sync();
ssn.messageTransfer("asdf", (short) 0, (short) 1);
ssn.headers(new DeliveryProperties(),