diff options
| author | Rafael H. Schloming <rhs@apache.org> | 2007-08-09 02:26:18 +0000 |
|---|---|---|
| committer | Rafael H. Schloming <rhs@apache.org> | 2007-08-09 02:26:18 +0000 |
| commit | ce1474162e775b693383ed55ebb6dacf562baaa5 (patch) | |
| tree | d700eaeb06f26a096b926d27e67c8abf49dedc36 /java | |
| parent | ba57e373864d44cfae17ec8c2c9de7a55f0b4113 (diff) | |
| download | qpid-python-ce1474162e775b693383ed55ebb6dacf562baaa5.tar.gz | |
updated the amqp.0-10-preview.xml to reflect the latest votes, and added support for execution results
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@564077 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
9 files changed, 199 insertions, 16 deletions
diff --git a/java/client/src/main/java/org/apache/qpidity/impl/ClientSessionDelegate.java b/java/client/src/main/java/org/apache/qpidity/impl/ClientSessionDelegate.java index 6a36d694c6..ea9e4c067b 100644 --- a/java/client/src/main/java/org/apache/qpidity/impl/ClientSessionDelegate.java +++ b/java/client/src/main/java/org/apache/qpidity/impl/ClientSessionDelegate.java @@ -1,7 +1,6 @@ package org.apache.qpidity.impl; import org.apache.qpidity.CommonSessionDelegate; -import org.apache.qpidity.ExchangeQueryOk; import org.apache.qpidity.client.Session; @@ -43,10 +42,4 @@ public class ClientSessionDelegate extends CommonSessionDelegate l.endData(); }*/ - - // -------------------------------------------- - // Exchange related functionality - // -------------------------------------------- - public void exchangeQueryOk(Session session, ExchangeQueryOk struct) {} - } diff --git a/java/common/generate b/java/common/generate index f0a91e9d50..35b42e1207 100755 --- a/java/common/generate +++ b/java/common/generate @@ -103,7 +103,8 @@ OPTIONS = {} class Struct: - def __init__(self, name, base, type, track, content): + def __init__(self, node, name, base, type, track, content): + self.node = node self.name = name self.base = base self.type = type @@ -111,6 +112,16 @@ class Struct: self.content = content self.fields = [] + def result(self): + r = self.node["result"] + if not r: return + name = r["@domain"] + if not name: + name = self.name + "Result" + else: + name = camel(0, name) + return name + def field(self, type, name): self.fields.append((type, name)) @@ -265,6 +276,7 @@ class Visitor(mllib.transforms.Visitor): name = camel(0, m.parent["@name"], m["@name"]) type = int(m.parent["@index"])*256 + int(m["@index"]) self.structs.append((name, "Method", type, m)) + self.descend(m) def do_domain(self, d): s = d["struct"] @@ -276,6 +288,15 @@ class Visitor(mllib.transforms.Visitor): else: type = int(st) self.structs.append((name, "Struct", type, s)) + self.descend(d) + + def do_result(self, r): + s = r["struct"] + if s: + name = camel(0, r.parent.parent["@name"], r.parent["@name"], "Result") + type = int(r.parent.parent["@index"]) * 256 + int(s["@type"]) + self.structs.append((name, "Result", type, s)) + self.descend(r) v = Visitor() spec.dispatch(v) @@ -284,7 +305,7 @@ opts = Output(out_dir, out_pkg, "Option") opts.line("public enum Option {") structs = [] for name, base, typecode, m in v.structs: - struct = Struct(name, base, typecode, + struct = Struct(m, name, base, typecode, TRACKS.get(m.parent["@name"], "Frame.L4"), m["@content"] == "1") for f in m.query["field", lambda f: FIELDS.get(f["@name"], True)]: @@ -297,7 +318,7 @@ for name, base, typecode, m in v.structs: OPTIONS[name] = opt_name opts.line(" %s," % opt_name) structs.append(struct) -opts.line(" %s," % "NO_OPTION") +opts.line(" %s," % "NO_OPTION") opts.line("}") opts.write() @@ -333,13 +354,22 @@ inv = Output(out_dir, out_pkg, "Invoker") inv.line("public abstract class Invoker {") inv.line() inv.line(" protected abstract void invoke(Method method);") -inv.line(" protected abstract void invoke(Method method, Handler<Struct> handler);") +inv.line(" protected abstract <T> Future<T> invoke(Method method, Class<T> resultClass);") inv.line() for s in structs: if s.base != "Method": continue dname = dromedary(s.name) - inv.line(" public void %s(%s) {" % (dname, s.parameters())) - inv.line(" invoke(new %s(%s));" % (s.name, s.arguments())) + result = s.result() + if result: + result_type = "Future<%s>" % result + else: + result_type = "void" + inv.line(" public %s %s(%s) {" % (result_type, dname, s.parameters())) + if result: + inv.line(" return invoke(new %s(%s), %s.class);" % + (s.name, s.arguments(), result)) + else: + inv.line(" invoke(new %s(%s));" % (s.name, s.arguments())) inv.line(" }") inv.line("}") inv.write() diff --git a/java/common/src/main/java/org/apache/qpidity/Channel.java b/java/common/src/main/java/org/apache/qpidity/Channel.java index 8cd07f002a..f20c65e467 100644 --- a/java/common/src/main/java/org/apache/qpidity/Channel.java +++ b/java/common/src/main/java/org/apache/qpidity/Channel.java @@ -205,7 +205,7 @@ class Channel extends Invoker implements Handler<Frame> method(m); } - protected void invoke(Method m, Handler<Struct> handler) + protected <T> Future<T> invoke(Method m, Class<T> cls) { throw new UnsupportedOperationException(); } diff --git a/java/common/src/main/java/org/apache/qpidity/Future.java b/java/common/src/main/java/org/apache/qpidity/Future.java new file mode 100644 index 0000000000..8902446e95 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpidity/Future.java @@ -0,0 +1,37 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpidity; + + +/** + * Future + * + * @author Rafael H. Schloming + */ + +public interface Future<T> +{ + + T get(); + + boolean isDone(); + +} diff --git a/java/common/src/main/java/org/apache/qpidity/Result.java b/java/common/src/main/java/org/apache/qpidity/Result.java new file mode 100644 index 0000000000..7fe6c869a4 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpidity/Result.java @@ -0,0 +1,30 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpidity; + + +/** + * Result + * + * @author Rafael H. Schloming + */ + +public abstract class Result extends Struct {} diff --git a/java/common/src/main/java/org/apache/qpidity/Session.java b/java/common/src/main/java/org/apache/qpidity/Session.java index c8b3c7c5bb..3b33fde2df 100644 --- a/java/common/src/main/java/org/apache/qpidity/Session.java +++ b/java/common/src/main/java/org/apache/qpidity/Session.java @@ -23,6 +23,8 @@ package org.apache.qpidity; import java.nio.ByteBuffer; import java.util.HashMap; import java.util.Map; + + /** * Session * @@ -217,9 +219,86 @@ public class Session extends Invoker } } - protected void invoke(Method m, Handler<Struct> handler) + private Map<Long,ResultFuture<?>> results = + new HashMap<Long,ResultFuture<?>>(); + + void result(long command, Struct result) + { + ResultFuture<?> future; + synchronized (results) + { + future = results.remove(command); + } + future.set(result); + } + + protected <T> Future<T> invoke(Method m, Class<T> klass) + { + long command = commandsOut; + ResultFuture<T> future = new ResultFuture<T>(klass); + synchronized (results) + { + results.put(command, future); + } + invoke(m); + return future; + } + + private class ResultFuture<T> implements Future<T> { - throw new UnsupportedOperationException(); + + private final Class<T> klass; + private T result; + + private ResultFuture(Class<T> klass) + { + this.klass = klass; + } + + private void set(Struct result) + { + synchronized (this) + { + this.result = klass.cast(result); + notifyAll(); + } + } + + public T get(long timeout, int nanos) + { + synchronized (this) + { + while (!isDone()) + { + try + { + wait(timeout, nanos); + } + catch (InterruptedException e) + { + throw new RuntimeException(e); + } + } + } + + return result; + } + + public T get(long timeout) + { + return get(timeout, 0); + } + + public T get() + { + return get(0); + } + + public boolean isDone() + { + return result != null; + } + } } diff --git a/java/common/src/main/java/org/apache/qpidity/SessionDelegate.java b/java/common/src/main/java/org/apache/qpidity/SessionDelegate.java index 5d62a57e93..1287e3dc1a 100644 --- a/java/common/src/main/java/org/apache/qpidity/SessionDelegate.java +++ b/java/common/src/main/java/org/apache/qpidity/SessionDelegate.java @@ -34,6 +34,11 @@ public abstract class SessionDelegate extends Delegate<Session> public abstract void data(Session ssn, Frame frame); + @Override public void executionResult(Session ssn, ExecutionResult result) + { + ssn.result(result.getCommandId(), result.getData()); + } + @Override public void executionComplete(Session ssn, ExecutionComplete excmp) { RangeSet ranges = excmp.getRangedExecutionSet(); diff --git a/java/common/src/main/java/org/apache/qpidity/ToyBroker.java b/java/common/src/main/java/org/apache/qpidity/ToyBroker.java index 4a33122d37..651241f63c 100644 --- a/java/common/src/main/java/org/apache/qpidity/ToyBroker.java +++ b/java/common/src/main/java/org/apache/qpidity/ToyBroker.java @@ -60,6 +60,12 @@ class ToyBroker extends SessionDelegate System.out.println("declared queue: " + qd.getQueue()); } + @Override public void queueQuery(Session ssn, QueueQuery qq) + { + QueueQueryResult result = new QueueQueryResult().queue(qq.getQueue()); + ssn.executionResult(qq.getId(), result); + } + @Override public void messageTransfer(Session ssn, MessageTransfer xfr) { this.xfr = xfr; diff --git a/java/common/src/main/java/org/apache/qpidity/ToyClient.java b/java/common/src/main/java/org/apache/qpidity/ToyClient.java index 4ec838bc35..e325fb93be 100644 --- a/java/common/src/main/java/org/apache/qpidity/ToyClient.java +++ b/java/common/src/main/java/org/apache/qpidity/ToyClient.java @@ -85,6 +85,9 @@ class ToyClient extends SessionDelegate ssn.data("this should be rejected"); ssn.endData(); ssn.sync(); + + Future<QueueQueryResult> future = ssn.queueQuery("asdf"); + System.out.println(future.get().getQueue()); } } |
