diff options
| author | Rafael H. Schloming <rhs@apache.org> | 2007-08-08 04:36:31 +0000 |
|---|---|---|
| committer | Rafael H. Schloming <rhs@apache.org> | 2007-08-08 04:36:31 +0000 |
| commit | dff8b876caa9625dc0e03e03dadad22a504826d1 (patch) | |
| tree | 6afa9f8388c40603fc0423e96e2555a9455b83d6 /java/common/src | |
| parent | a45694048d1f26e0ed317f661b464bae862fb8fa (diff) | |
| download | qpid-python-dff8b876caa9625dc0e03e03dadad22a504826d1.tar.gz | |
implemented Session.sync()
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@563738 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/common/src')
10 files changed, 242 insertions, 27 deletions
diff --git a/java/common/src/main/java/org/apache/qpidity/AbstractDecoder.java b/java/common/src/main/java/org/apache/qpidity/AbstractDecoder.java index 9db419537c..ca728812bd 100644 --- a/java/common/src/main/java/org/apache/qpidity/AbstractDecoder.java +++ b/java/common/src/main/java/org/apache/qpidity/AbstractDecoder.java @@ -143,7 +143,7 @@ abstract class AbstractDecoder implements Decoder return null; } - public Range<Long>[] readRfc1982LongSet() + public RangeSet readRfc1982LongSet() { int count = readShort()/8; if (count == 0) @@ -152,10 +152,10 @@ abstract class AbstractDecoder implements Decoder } else { - Range<Long>[] ranges = new Range[count]; + RangeSet ranges = new RangeSet(); for (int i = 0; i < count; i++) { - ranges[i] = new Range<Long>(readLong(), readLong()); + ranges.add(readLong(), readLong()); } return ranges; } diff --git a/java/common/src/main/java/org/apache/qpidity/AbstractEncoder.java b/java/common/src/main/java/org/apache/qpidity/AbstractEncoder.java index f898d759d5..ecd274615b 100644 --- a/java/common/src/main/java/org/apache/qpidity/AbstractEncoder.java +++ b/java/common/src/main/java/org/apache/qpidity/AbstractEncoder.java @@ -150,7 +150,7 @@ abstract class AbstractEncoder implements Encoder //throw new Error("TODO"); } - public void writeRfc1982LongSet(Range<Long>[] ranges) + public void writeRfc1982LongSet(RangeSet ranges) { if (ranges == null) { @@ -158,8 +158,8 @@ abstract class AbstractEncoder implements Encoder } else { - writeShort(ranges.length * 8); - for (Range<Long> range : ranges) + writeShort(ranges.size() * 8); + for (Range range : ranges) { writeLong(range.getLower()); writeLong(range.getUpper()); diff --git a/java/common/src/main/java/org/apache/qpidity/Decoder.java b/java/common/src/main/java/org/apache/qpidity/Decoder.java index 0a869baab7..13c3d0b7b8 100644 --- a/java/common/src/main/java/org/apache/qpidity/Decoder.java +++ b/java/common/src/main/java/org/apache/qpidity/Decoder.java @@ -44,7 +44,7 @@ public interface Decoder String readLongstr(); Map<String,?> readTable(); - Range<Long>[] readRfc1982LongSet(); + RangeSet readRfc1982LongSet(); UUID readUuid(); String readContent(); diff --git a/java/common/src/main/java/org/apache/qpidity/Encoder.java b/java/common/src/main/java/org/apache/qpidity/Encoder.java index c25b96e462..990cef7081 100644 --- a/java/common/src/main/java/org/apache/qpidity/Encoder.java +++ b/java/common/src/main/java/org/apache/qpidity/Encoder.java @@ -44,7 +44,7 @@ public interface Encoder void writeLongstr(String s); void writeTable(Map<String,?> table); - void writeRfc1982LongSet(Range<Long>[] ranges); + void writeRfc1982LongSet(RangeSet ranges); void writeUuid(UUID uuid); void writeContent(String c); diff --git a/java/common/src/main/java/org/apache/qpidity/Range.java b/java/common/src/main/java/org/apache/qpidity/Range.java index 4766ecb471..9da7112a6d 100644 --- a/java/common/src/main/java/org/apache/qpidity/Range.java +++ b/java/common/src/main/java/org/apache/qpidity/Range.java @@ -20,6 +20,8 @@ */ package org.apache.qpidity; +import static java.lang.Math.*; + /** * Range @@ -27,25 +29,57 @@ package org.apache.qpidity; * @author Rafael H. Schloming */ -public class Range<C extends Comparable> +public class Range { - private final C lower; - private final C upper; + private final long lower; + private final long upper; - public Range(C lower, C upper) + public Range(long lower, long upper) { this.lower = lower; this.upper = upper; } - public C getLower() + public long getLower() { return lower; } - public C getUpper() + public long getUpper() { return upper; } + public boolean includes(long value) + { + return lower <= value && value <= upper; + } + + public boolean includes(Range range) + { + return includes(range.lower) && includes(range.upper); + } + + public boolean intersects(Range range) + { + return (includes(range.lower) || includes(range.upper) || + range.includes(lower) || range.includes(upper)); + } + + public boolean touches(Range range) + { + return (includes(range.upper + 1) || includes(range.lower - 1) || + range.includes(upper + 1) || range.includes(lower - 1)); + } + + public Range span(Range range) + { + return new Range(min(lower, range.lower), max(upper, range.upper)); + } + + public String toString() + { + return "[" + lower + ", " + upper + "]"; + } + } diff --git a/java/common/src/main/java/org/apache/qpidity/RangeSet.java b/java/common/src/main/java/org/apache/qpidity/RangeSet.java new file mode 100644 index 0000000000..297df7c0c9 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpidity/RangeSet.java @@ -0,0 +1,110 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpidity; + +import java.util.Collection; +import java.util.Iterator; +import java.util.ListIterator; +import java.util.LinkedList; + + +/** + * RangeSet + * + * @author Rafael H. Schloming + */ + +public class RangeSet implements Iterable<Range> +{ + + private LinkedList<Range> ranges = new LinkedList<Range>(); + + public int size() + { + return ranges.size(); + } + + public Iterator<Range> iterator() + { + return ranges.iterator(); + } + + public void add(Range range) + { + ListIterator<Range> it = ranges.listIterator(); + + while (it.hasNext()) + { + Range next = it.next(); + if (range.touches(next)) + { + it.remove(); + range = range.span(next); + } + else if (range.getUpper() < next.getLower()) + { + it.previous(); + it.add(range); + return; + } + } + + it.add(range); + } + + public void add(long lower, long upper) + { + add(new Range(lower, upper)); + } + + public void add(long value) + { + add(value, value); + } + + public void clear() + { + ranges.clear(); + } + + public String toString() + { + return ranges.toString(); + } + + public static final void main(String[] args) + { + RangeSet ranges = new RangeSet(); + ranges.add(5, 10); + System.out.println(ranges); + ranges.add(15, 20); + System.out.println(ranges); + ranges.add(23, 25); + System.out.println(ranges); + ranges.add(12, 14); + System.out.println(ranges); + ranges.add(0, 1); + System.out.println(ranges); + ranges.add(3, 11); + System.out.println(ranges); + } + +} diff --git a/java/common/src/main/java/org/apache/qpidity/Session.java b/java/common/src/main/java/org/apache/qpidity/Session.java index b3116570e8..0c94c6df4e 100644 --- a/java/common/src/main/java/org/apache/qpidity/Session.java +++ b/java/common/src/main/java/org/apache/qpidity/Session.java @@ -37,11 +37,15 @@ public class Session extends Invoker // channel may be null Channel channel; - // outgoing command count - private long commandsOut = 0; + // XXX: incoming command count not used // incoming command count private long commandsIn = 0; + // completed incoming commands + private final RangeSet processed = new RangeSet(); + + // outgoing command count + private long commandsOut = 0; private Map<Long,Method> commands = new HashMap<Long,Method>(); private long mark = 0; @@ -55,6 +59,31 @@ public class Session extends Invoker return commandsIn; } + public RangeSet getProcessed() + { + return processed; + } + + public void processed(long command) + { + processed.add(command); + } + + public void processed(long lower, long upper) + { + processed.add(lower, upper); + } + + public void processed(Range range) + { + processed.add(range); + } + + public void processed(Struct command) + { + processed(command.getId()); + } + public void attach(Channel channel) { this.channel = channel; @@ -63,15 +92,24 @@ public class Session extends Invoker public Method getCommand(long id) { - System.out.println(id + " " + commands); - return commands.get(id); + synchronized (commands) + { + return commands.get(id); + } } void complete(long lower, long upper) { - for (long id = lower; id <= upper; id++) + synchronized (commands) { - commands.put(id, null); + for (long id = lower; id <= upper; id++) + { + commands.remove(id); + } + if (commands.isEmpty()) + { + commands.notifyAll(); + } } } @@ -85,8 +123,10 @@ public class Session extends Invoker { if (m.getEncodedTrack() == Frame.L4) { - long cmd = commandsOut++; - commands.put(cmd, m); + synchronized (commands) + { + commands.put(commandsOut++, m); + } } channel.method(m); } @@ -116,6 +156,28 @@ public class Session extends Invoker channel.end(); } + public void sync() + { + synchronized (commands) + { + if (!commands.isEmpty()) + { + executionSync(); + } + + while (!commands.isEmpty()) + { + try { + commands.wait(); + } + catch (InterruptedException e) + { + throw new RuntimeException(e); + } + } + } + } + protected void invoke(Method m, Handler<Struct> handler) { throw new UnsupportedOperationException(); diff --git a/java/common/src/main/java/org/apache/qpidity/SessionDelegate.java b/java/common/src/main/java/org/apache/qpidity/SessionDelegate.java index c2fbe6ba00..1275378b7e 100644 --- a/java/common/src/main/java/org/apache/qpidity/SessionDelegate.java +++ b/java/common/src/main/java/org/apache/qpidity/SessionDelegate.java @@ -36,10 +36,10 @@ public abstract class SessionDelegate extends Delegate<Session> @Override public void executionComplete(Session ssn, ExecutionComplete excmp) { - Range<Long>[] ranges = excmp.getRangedExecutionSet(); + RangeSet ranges = excmp.getRangedExecutionSet(); if (ranges != null) { - for (Range<Long> range : ranges) + for (Range range : ranges) { ssn.complete(range.getLower(), range.getUpper()); } @@ -47,4 +47,9 @@ public abstract class SessionDelegate extends Delegate<Session> ssn.complete(excmp.getCumulativeExecutionMark()); } + @Override public void executionSync(Session ssn, ExecutionSync sync) + { + ssn.executionComplete(0, ssn.getProcessed()); + } + } diff --git a/java/common/src/main/java/org/apache/qpidity/ToyBroker.java b/java/common/src/main/java/org/apache/qpidity/ToyBroker.java index 534f075dbd..8c8c2c890d 100644 --- a/java/common/src/main/java/org/apache/qpidity/ToyBroker.java +++ b/java/common/src/main/java/org/apache/qpidity/ToyBroker.java @@ -58,6 +58,7 @@ class ToyBroker extends SessionDelegate { queues.put(qd.getQueue(), new LinkedList()); System.out.println("declared queue: " + qd.getQueue()); + ssn.processed(qd); } @Override public void messageTransfer(Session ssn, MessageTransfer xfr) @@ -120,6 +121,7 @@ class ToyBroker extends SessionDelegate queue.offer(m); System.out.println("queued " + m); } + ssn.processed(xfr); xfr = null; frames = null; } @@ -133,8 +135,8 @@ class ToyBroker extends SessionDelegate } else { - long id = xfr.getId(); - Range[] ranges = {new Range<Long>(id, id)}; + RangeSet ranges = new RangeSet(); + ranges.add(xfr.getId()); ssn.messageReject(ranges, 0, "no such destination"); } } diff --git a/java/common/src/main/java/org/apache/qpidity/ToyClient.java b/java/common/src/main/java/org/apache/qpidity/ToyClient.java index 1e57de3265..db03d30605 100644 --- a/java/common/src/main/java/org/apache/qpidity/ToyClient.java +++ b/java/common/src/main/java/org/apache/qpidity/ToyClient.java @@ -32,7 +32,7 @@ class ToyClient extends SessionDelegate @Override public void messageReject(Session ssn, MessageReject reject) { - for (Range<Long> range : reject.getTransfers()) + for (Range range : reject.getTransfers()) { for (long l = range.getLower(); l <= range.getUpper(); l++) { @@ -40,6 +40,7 @@ class ToyClient extends SessionDelegate ssn.getCommand((int) l)); } } + ssn.processed(reject); } public void headers(Session ssn, Struct ... headers) @@ -73,6 +74,7 @@ class ToyClient extends SessionDelegate ssn.sessionOpen(1234); ssn.queueDeclare("asdf", null, null); + ssn.sync(); ssn.messageTransfer("asdf", (short) 0, (short) 1); ssn.headers(new DeliveryProperties(), |
