diff options
| author | Rafael H. Schloming <rhs@apache.org> | 2007-08-07 03:43:42 +0000 |
|---|---|---|
| committer | Rafael H. Schloming <rhs@apache.org> | 2007-08-07 03:43:42 +0000 |
| commit | 15b836801bd401565d452bb3c31e830eeb4bc0d1 (patch) | |
| tree | 328d540c5c0a566819656384e276fd4cc7eb98bb /java/common | |
| parent | cd31479023158c05208bfe4c4084bd59521123dd (diff) | |
| download | qpid-python-15b836801bd401565d452bb3c31e830eeb4bc0d1.tar.gz | |
- made generated classes mutable
- added setters to generated classes
- moved version specific aspect of generated classes to read/write methods
- simplified StructFactory
- added ConnectionDelegate
- removed DelegateResolver
- implemented content and header transfer
- implemented primitive command window
- added ToyBroker and ToyClient for testing
- removed throws QpidException from generated delegate
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@563386 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/common')
31 files changed, 962 insertions, 371 deletions
diff --git a/java/common/generate b/java/common/generate index 7ed52a23e5..ea805b62f2 100755 --- a/java/common/generate +++ b/java/common/generate @@ -9,9 +9,6 @@ out_dir=sys.argv[1] out_pkg = sys.argv[2] spec_file = sys.argv[3] spec = mllib.xml_parse(spec_file) -major = spec["amqp/@major"] -minor = spec["amqp/@minor"] -isfx = "_v%s_%s" % (major, minor) class Output: @@ -58,6 +55,13 @@ TYPES = { "long-struct": "Struct" } +TRACKS = { + "connection": "Frame.L1", + "session": "Frame.L2", + "execution": "Frame.L3", + None: None + } + def camel(offset, *args): parts = [] for a in args: @@ -99,29 +103,23 @@ OPTIONS = {} class Struct: - def __init__(self, name, base, type): + def __init__(self, name, base, type, track, content): self.name = name self.base = base self.type = type + self.track = track + self.content = content self.fields = [] def field(self, type, name): self.fields.append((type, name)) - def interface(self, out): - out.line("public interface %s extends %s {" % (self.name, self.base)) - out.line() + def impl(self, out): + out.line("public class %s extends %s {" % (self.name, self.base)) + if self.type != None: + out.line() out.line(" public static final int TYPE = %d;" % self.type) - out.line() - for type, name in self.fields: - out.line(" %s %s();" % (jtype(type), camel(1, "get", name))) - out.line() - out.line("}") - - def impl(self, out): - out.line("class %s%s extends Abstract%s implements %s {" % - (self.name, isfx, self.base, self.name)) out.line() out.line(" public int getEncodedType() {") @@ -131,23 +129,30 @@ class Struct: out.line(" return TYPE;") out.line(" }") - out.line() - for type, name in self.fields: - out.line(" private final %s %s;" % (jtype(type), name)) + if self.base == "Method": + out.line() + out.line(" public boolean hasPayload() {") + if self.content: + out.line(" return true;") + else: + out.line(" return false;") + out.line(" }") + + out.line() + out.line(" public byte getEncodedTrack() {") + out.line(" return %s;" % self.track) + out.line(" }") out.line() - out.line(" %s%s(Decoder dec) {" % (self.name, isfx)) for type, name in self.fields: - if TYPES.has_key(type): - out.line(" %s = dec.read%s();" % (name, camel(0, type))) - elif STRUCTS.has_key(type): - out.line(" %s = new %s%s(dec);" % (name, STRUCTS[type], isfx)) - else: - raise Exception("unknown type: %s" % type) - out.line(" }") + out.line(" private %s %s;" % (jtype(type), name)) + + if self.fields: + out.line() + out.line(" public %s() {}" % self.name) out.line() - out.line(" %s%s(%s) {" % (self.name, isfx, self.parameters())) + out.line(" public %s(%s) {" % (self.name, self.parameters())) opts = False for type, name in self.fields: if not OPTIONS.has_key(name): @@ -163,6 +168,7 @@ class Struct: for type, name in self.fields: if OPTIONS.has_key(name): out.line(" case %s: _%s=true; break;" % (OPTIONS[name], name)) + out.line(" case NO_OPTION: break;") out.line(' default: throw new IllegalArgumentException' '("invalid option: " + _options[i]);') out.line(" }") @@ -177,19 +183,44 @@ class Struct: out.line(" delegate.%s(context, this);" % dromedary(self.name)) out.line(" }") - out.line() for type, name in self.fields: + out.line() out.line(" public %s %s() {" % (jtype(type), camel(1, "get", name))) out.line(" return %s;" % name) out.line(" }") + out.line(" public %s %s(%s value) {" % + (self.name, camel(1, "set", name), jtype(type))) + out.line(" this.%s = value;" % name); + out.line(" return this;") + out.line(" }") + out.line(" public %s %s(%s value) {" % (self.name, name, jtype(type))) + out.line(" this.%s = value;" % name); + out.line(" return this;") + out.line(" }") + + out.line() + out.line(" public void read(Decoder dec, byte major, byte minor) {") + for type, name in self.fields: + if TYPES.has_key(type): + out.line(" %s = dec.read%s();" % (name, camel(0, type))) + elif STRUCTS.has_key(type): + out.line(" %s = new %s();" % (name, STRUCTS[type])) + out.line(" %s.read(dec, major, minor);" % name) + else: + raise Exception("unknown type: %s" % type) + out.line(" }") out.line() - out.line(" public void write(Encoder enc) {") + out.line(" public void write(Encoder enc, byte major, byte minor) {") for type, name in self.fields: if TYPES.has_key(type): out.line(" enc.write%s(%s);" % (camel(0, type), name)) elif STRUCTS.has_key(type): - out.line(" %s.write(enc);" % name) + out.line(" if (%s == null) {" % name) + out.line(" new %s().write(enc, major, minor);" % jtype(type)) + out.line(" } else {") + out.line(" %s.write(enc, major, minor);" % name) + out.line(" }") else: raise Exception("unknown type: %s" % type) out.line(" }") @@ -253,7 +284,9 @@ opts = Output(out_dir, out_pkg, "Option") opts.line("public enum Option {") structs = [] for name, base, typecode, m in v.structs: - struct = Struct(name, base, typecode) + struct = Struct(name, base, typecode, + TRACKS.get(m.parent["@name"], "Frame.L4"), + m["@content"] == "1") for f in m.query["field", lambda f: FIELDS.get(f["@name"], True)]: type = resolve(f["@domain"]) name = camel(1, f["@name"]) @@ -269,48 +302,24 @@ opts.line("}") opts.write() for s in structs: - out = Output(out_dir, out_pkg, s.name) - s.interface(out) - out.write() - iout = Output(out_dir, out_pkg, s.name + isfx) - s.impl(iout) - iout.write() + impl = Output(out_dir, out_pkg, s.name) + s.impl(impl) + impl.write() fct = Output(out_dir, out_pkg, "StructFactory") -fct.line("public interface StructFactory {") -fct.line(" Struct create(int type, Decoder dec);") -for s in structs: - fct.line() - fct.line(" %s new%s(Decoder dec);" % (s.name, s.name)) - fct.line(" %s new%s(%s);" % (s.name, s.name, s.parameters())) -fct.line("}") -fct.write() - -ifct_name = "StructFactory%s" % isfx -ifct = Output(out_dir, out_pkg, ifct_name) -ifct.line("class %s implements StructFactory {" % ifct_name) -ifct.line(" public Struct create(int type, Decoder dec) {") -ifct.line(" switch (type) {") +fct.line("class StructFactory {") +fct.line(" public static Struct create(int type) {") +fct.line(" switch (type) {") for s in structs: if s.type == None: continue - ifct.line(" case %s.TYPE:" % s.name) - ifct.line(" return new %s%s(dec);" % (s.name, isfx)) -ifct.line(" default:") -ifct.line(' throw new IllegalArgumentException("type: " + type);') -ifct.line(" }") -ifct.line(" }") - -for s in structs: - ifct.line(" public %s new%s(Decoder dec) {" % (s.name, s.name)) - ifct.line(" return new %s%s(dec);" % (s.name, isfx)) - ifct.line(" }") - - ifct.line(" public %s new%s(%s) {" % (s.name, s.name, s.parameters())) - ifct.line(" return new %s%s(%s);" % (s.name, isfx, s.arguments())) - ifct.line(" }") - -ifct.line("}"); -ifct.write() + fct.line(" case %s.TYPE:" % s.name) + fct.line(" return new %s();" % s.name) +fct.line(" default:") +fct.line(' throw new IllegalArgumentException("type: " + type);') +fct.line(" }") +fct.line(" }") +fct.line("}"); +fct.write() dlg = Output(out_dir, out_pkg, "Delegate") dlg.line("public abstract class Delegate<C> {") @@ -325,13 +334,12 @@ inv.line("public abstract class Invoker {") inv.line() inv.line(" protected abstract void invoke(Method method);") inv.line(" protected abstract void invoke(Method method, Handler<Struct> handler);") -inv.line(" protected abstract StructFactory getFactory();") inv.line() for s in structs: if s.base != "Method": continue dname = dromedary(s.name) - inv.line(" public void %s(%s) throws QpidException {" % (dname, s.parameters())) - inv.line(" invoke(getFactory().new%s(%s));" % (s.name, s.arguments())) + inv.line(" public void %s(%s) {" % (dname, s.parameters())) + inv.line(" invoke(new %s(%s));" % (s.name, s.arguments())) inv.line(" }") inv.line("}") inv.write() diff --git a/java/common/src/main/java/org/apache/qpidity/AbstractDecoder.java b/java/common/src/main/java/org/apache/qpidity/AbstractDecoder.java index ee350ac343..9db419537c 100644 --- a/java/common/src/main/java/org/apache/qpidity/AbstractDecoder.java +++ b/java/common/src/main/java/org/apache/qpidity/AbstractDecoder.java @@ -35,17 +35,24 @@ import static org.apache.qpidity.Functions.*; abstract class AbstractDecoder implements Decoder { - private final StructFactory factory; + private final byte major; + private final byte minor; - protected AbstractDecoder(StructFactory factory) + protected AbstractDecoder(byte major, byte minor) { - this.factory = factory; + this.major = major; + this.minor = minor; } protected abstract byte get(); protected abstract void get(byte[] bytes); + protected short uget() + { + return unsigned(get()); + } + private byte bits = 0x0; private byte nbits = 0; @@ -72,38 +79,38 @@ abstract class AbstractDecoder implements Decoder public short readOctet() { clearBits(); - return unsigned(get()); + return uget(); } public int readShort() { clearBits(); - int i = get() << 8; - i |= get(); + int i = uget() << 8; + i |= uget(); return i; } public long readLong() { clearBits(); - long l = get() << 24; - l |= get() << 16; - l |= get() << 8; - l |= get(); + long l = uget() << 24; + l |= uget() << 16; + l |= uget() << 8; + l |= uget(); return l; } public long readLonglong() { clearBits(); - long l = get() << 56; - l |= get() << 48; - l |= get() << 40; - l |= get() << 32; - l |= get() << 24; - l |= get() << 16; - l |= get() << 8; - l |= get(); + long l = uget() << 56; + l |= uget() << 48; + l |= uget() << 40; + l |= uget() << 32; + l |= uget() << 24; + l |= uget() << 16; + l |= uget() << 8; + l |= uget(); return l; } @@ -138,7 +145,20 @@ abstract class AbstractDecoder implements Decoder public Range<Long>[] readRfc1982LongSet() { - throw new Error("TODO"); + int count = readShort()/8; + if (count == 0) + { + return null; + } + else + { + Range<Long>[] ranges = new Range[count]; + for (int i = 0; i < count; i++) + { + ranges[i] = new Range<Long>(readLong(), readLong()); + } + return ranges; + } } public UUID readUuid() @@ -156,8 +176,17 @@ abstract class AbstractDecoder implements Decoder public Struct readLongStruct() { long size = readLong(); - int type = readShort(); - return factory.create(type, this); + if (size == 0) + { + return null; + } + else + { + int type = readShort(); + Struct result = Struct.create(type); + result.read(this, major, minor); + return result; + } } } diff --git a/java/common/src/main/java/org/apache/qpidity/AbstractEncoder.java b/java/common/src/main/java/org/apache/qpidity/AbstractEncoder.java index deca409788..f898d759d5 100644 --- a/java/common/src/main/java/org/apache/qpidity/AbstractEncoder.java +++ b/java/common/src/main/java/org/apache/qpidity/AbstractEncoder.java @@ -37,6 +37,15 @@ import static org.apache.qpidity.Functions.*; abstract class AbstractEncoder implements Encoder { + private final byte major; + private final byte minor; + + protected AbstractEncoder(byte major, byte minor) + { + this.major = major; + this.minor = minor; + } + protected abstract void put(byte b); protected abstract void put(ByteBuffer src); @@ -82,7 +91,7 @@ abstract class AbstractEncoder implements Encoder assert s < 0x10000; flushBits(); - put(lsb(s >> 8)); + put(lsb(s >>> 8)); put(lsb(s)); } @@ -91,22 +100,22 @@ abstract class AbstractEncoder implements Encoder assert i < 0x100000000L; flushBits(); - put(lsb(i >> 24)); - put(lsb(i >> 16)); - put(lsb(i >> 8)); + put(lsb(i >>> 24)); + put(lsb(i >>> 16)); + put(lsb(i >>> 8)); put(lsb(i)); } public void writeLonglong(long l) { flushBits(); - put(lsb(l >> 56)); - put(lsb(l >> 48)); - put(lsb(l >> 40)); - put(lsb(l >> 32)); - put(lsb(l >> 24)); - put(lsb(l >> 16)); - put(lsb(l >> 8)); + put(lsb(l >>> 56)); + put(lsb(l >>> 48)); + put(lsb(l >>> 40)); + put(lsb(l >>> 32)); + put(lsb(l >>> 24)); + put(lsb(l >>> 16)); + put(lsb(l >>> 8)); put(lsb(l)); } @@ -120,6 +129,7 @@ abstract class AbstractEncoder implements Encoder public void writeShortstr(String s) { + if (s == null) { s = ""; } if (s.length() > 255) { throw new IllegalArgumentException(s); } @@ -129,6 +139,7 @@ abstract class AbstractEncoder implements Encoder public void writeLongstr(String s) { + if (s == null) { s = ""; } writeLong(s.length()); put(ByteBuffer.wrap(s.getBytes())); } @@ -141,13 +152,32 @@ abstract class AbstractEncoder implements Encoder public void writeRfc1982LongSet(Range<Long>[] ranges) { - throw new Error("TODO"); + if (ranges == null) + { + writeShort((short) 0); + } + else + { + writeShort(ranges.length * 8); + for (Range<Long> range : ranges) + { + writeLong(range.getLower()); + writeLong(range.getUpper()); + } + } } public void writeUuid(UUID uuid) { - writeLong(uuid.getMostSignificantBits()); - writeLong(uuid.getLeastSignificantBits()); + long msb = 0; + long lsb = 0; + if (uuid != null) + { + msb = uuid.getMostSignificantBits(); + uuid.getLeastSignificantBits(); + } + writeLong(msb); + writeLong(lsb); } public void writeContent(String c) @@ -157,12 +187,20 @@ abstract class AbstractEncoder implements Encoder public void writeLongStruct(Struct s) { - SizeEncoder sizer = new SizeEncoder(); - sizer.writeShort(s.getEncodedType()); - s.write(sizer); - writeLong(sizer.getSize()); - writeShort(s.getEncodedType()); - s.write(this); + if (s == null) + { + writeLong(0); + } + else + { + SizeEncoder sizer = new SizeEncoder(major, minor); + sizer.writeShort(s.getEncodedType()); + s.write(sizer, major, minor); + + writeLong(sizer.getSize()); + writeShort(s.getEncodedType()); + s.write(this, major, minor); + } } public void flush() diff --git a/java/common/src/main/java/org/apache/qpidity/BBDecoder.java b/java/common/src/main/java/org/apache/qpidity/BBDecoder.java index 49255de4be..afbb0eddab 100644 --- a/java/common/src/main/java/org/apache/qpidity/BBDecoder.java +++ b/java/common/src/main/java/org/apache/qpidity/BBDecoder.java @@ -34,9 +34,9 @@ class BBDecoder extends AbstractDecoder private final ByteBuffer in; - public BBDecoder(StructFactory factory, ByteBuffer in) + public BBDecoder(byte major, byte minor, ByteBuffer in) { - super(factory); + super(major, minor); this.in = in; } diff --git a/java/common/src/main/java/org/apache/qpidity/BBEncoder.java b/java/common/src/main/java/org/apache/qpidity/BBEncoder.java index 947a461ab3..3e0a0bb2ba 100644 --- a/java/common/src/main/java/org/apache/qpidity/BBEncoder.java +++ b/java/common/src/main/java/org/apache/qpidity/BBEncoder.java @@ -34,7 +34,8 @@ class BBEncoder extends AbstractEncoder private final ByteBuffer out; - public BBEncoder(ByteBuffer out) { + public BBEncoder(byte major, byte minor, ByteBuffer out) { + super(major, minor); this.out = out; } diff --git a/java/common/src/main/java/org/apache/qpidity/BodyHandler.java b/java/common/src/main/java/org/apache/qpidity/BodyHandler.java index 0b772c6f04..26e654d3a7 100644 --- a/java/common/src/main/java/org/apache/qpidity/BodyHandler.java +++ b/java/common/src/main/java/org/apache/qpidity/BodyHandler.java @@ -27,12 +27,20 @@ package org.apache.qpidity; * @author Rafael H. Schloming */ -class BodyHandler<C> implements Handler<Event<C,Frame>> +class BodyHandler implements Handler<Event<Session,Frame>> { - public void handle(Event<C,Frame> event) + private final SessionDelegate delegate; + + public BodyHandler(SessionDelegate delegate) + { + this.delegate = delegate; + } + + public void handle(Event<Session,Frame> event) { System.out.println("got body frame: " + event.target); + delegate.data(event.context, event.target); } } diff --git a/java/common/src/main/java/org/apache/qpidity/Channel.java b/java/common/src/main/java/org/apache/qpidity/Channel.java index 4f23112da3..2b25d7287b 100644 --- a/java/common/src/main/java/org/apache/qpidity/Channel.java +++ b/java/common/src/main/java/org/apache/qpidity/Channel.java @@ -20,6 +20,14 @@ */ package org.apache.qpidity; +import java.nio.ByteBuffer; + +import java.util.List; +import java.util.ArrayList; + +import static org.apache.qpidity.Frame.*; + + /** * Channel * @@ -32,27 +40,41 @@ class Channel extends Invoker implements Handler<Frame> final private Connection connection; final private int channel; final private TrackSwitch<Channel> tracks; + final private Delegate<Channel> delegate; // session may be null private Session session; - public Channel(Connection connection, int channel) + private Method method = null; + private List<ByteBuffer> data = null; + private int dataSize; + + public Channel(Connection connection, int channel, SessionDelegate delegate) { this.connection = connection; this.channel = channel; - - DelegateResolver<Channel> chDR = - new SimpleDelegateResolver<Channel>(new ChannelDelegate()); - DelegateResolver<Session> ssnDR = - new SimpleDelegateResolver<Session>(new SessionDelegate()); + this.delegate = new ChannelDelegate(); tracks = new TrackSwitch<Channel>(); - tracks.map(Frame.L1, new MethodHandler<Channel>(getFactory(), chDR)); - tracks.map(Frame.L2, new MethodHandler<Channel>(getFactory(), chDR)); - tracks.map(Frame.L3, new SessionResolver<Frame> - (new MethodHandler<Session>(getFactory(), ssnDR))); - tracks.map(Frame.L4, new SessionResolver<Frame> - (new ContentHandler<Session>(getFactory(), ssnDR))); + tracks.map(L1, new MethodHandler<Channel> + (getMajor(), getMinor(), this.delegate)); + tracks.map(L2, new MethodHandler<Channel> + (getMajor(), getMinor(), this.delegate)); + tracks.map(L3, new SessionResolver<Frame> + (new MethodHandler<Session> + (getMajor(), getMinor(), delegate))); + tracks.map(L4, new SessionResolver<Frame> + (new ContentHandler(getMajor(), getMinor(), delegate))); + } + + public byte getMajor() + { + return connection.getMajor(); + } + + public byte getMinor() + { + return connection.getMinor(); } public int getEncodedChannel() { @@ -74,35 +96,108 @@ class Channel extends Invoker implements Handler<Frame> tracks.handle(new Event<Channel,Frame>(this, frame)); } - public void write(Method m) + private SegmentEncoder newEncoder(byte flags, byte track, byte type, int size) { - SizeEncoder sizer = new SizeEncoder(); + return new SegmentEncoder(getMajor(), + getMinor(), + connection.getOutputHandler(), + connection.getMaxFrame(), + (byte) (flags | VERSION), + track, + type, + channel, + size); + } + + public void method(Method m) + { + SizeEncoder sizer = new SizeEncoder(getMajor(), getMinor()); sizer.writeLong(m.getEncodedType()); - m.write(sizer); + m.write(sizer, getMajor(), getMinor()); sizer.flush(); int size = sizer.getSize(); - // XXX: need to set header flags properly - SegmentEncoder enc = new SegmentEncoder(connection.getOutputHandler(), - connection.getMaxFrame(), - (byte) 0x0, - m.getEncodedTrack(), - m.getSegmentType(), - channel, - size); + byte flags = FIRST_SEG; + + if (!m.hasPayload()) + { + flags |= LAST_SEG; + } + + SegmentEncoder enc = newEncoder(flags, m.getEncodedTrack(), + m.getSegmentType(), size); enc.writeLong(m.getEncodedType()); - m.write(enc); + m.write(enc, getMajor(), getMinor()); enc.flush(); + + if (m.hasPayload()) + { + method = m; + } + } + + public void headers(Struct ... headers) + { + if (method == null) + { + throw new IllegalStateException("cannot write headers without method"); + } + + SizeEncoder sizer = new SizeEncoder(getMajor(), getMinor()); + for (Struct hdr : headers) + { + sizer.writeLongStruct(hdr); + } + + SegmentEncoder enc = newEncoder((byte) 0x0, + method.getEncodedTrack(), + HEADER, + sizer.getSize()); + for (Struct hdr : headers) + { + enc.writeLongStruct(hdr); + enc.flush(); + } + } + + public void data(ByteBuffer buf) + { + if (data == null) + { + data = new ArrayList<ByteBuffer>(); + dataSize = 0; + } + data.add(buf); + dataSize += buf.remaining(); } - protected StructFactory getFactory() + public void data(String str) { - return connection.getFactory(); + data(str.getBytes()); + } + + public void data(byte[] bytes) + { + data(ByteBuffer.wrap(bytes)); + } + + public void end() + { + byte flags = LAST_SEG; + SegmentEncoder enc = newEncoder(flags, method.getEncodedTrack(), + BODY, dataSize); + for (ByteBuffer buf : data) + { + enc.put(buf); + } + enc.flush(); + data = null; + dataSize = 0; } protected void invoke(Method m) { - write(m); + method(m); } protected void invoke(Method m, Handler<Struct> handler) diff --git a/java/common/src/main/java/org/apache/qpidity/AbstractMethod.java b/java/common/src/main/java/org/apache/qpidity/ChannelDelegate.java index 258c33d715..d486868621 100644 --- a/java/common/src/main/java/org/apache/qpidity/AbstractMethod.java +++ b/java/common/src/main/java/org/apache/qpidity/ChannelDelegate.java @@ -20,26 +20,32 @@ */ package org.apache.qpidity; +import java.util.UUID; + /** - * AbstractMethod + * ChannelDelegate * * @author Rafael H. Schloming */ -abstract class AbstractMethod extends AbstractStruct implements Method { +class ChannelDelegate extends Delegate<Channel> +{ - public byte getEncodedTrack() + public @Override void sessionOpen(Channel channel, SessionOpen open) { - // XXX - return Frame.L2; + Session ssn = new Session(); + ssn.attach(channel); + long lifetime = open.getDetachedLifetime(); + System.out.println("Session Opened lifetime = " + lifetime); + ssn.sessionAttached(UUID.randomUUID(), lifetime); } - // XXX: do we need a segment base type? - public byte getSegmentType() + public @Override void sessionAttached(Channel channel, + SessionAttached attached) { - // XXX - return Frame.METHOD; + System.out.println("Session attached: " + attached.getSessionId() + ", " + + attached.getDetachedLifetime()); } } diff --git a/java/common/src/main/java/org/apache/qpidity/Connection.java b/java/common/src/main/java/org/apache/qpidity/Connection.java index 7e31ca9b57..9171208a28 100644 --- a/java/common/src/main/java/org/apache/qpidity/Connection.java +++ b/java/common/src/main/java/org/apache/qpidity/Connection.java @@ -41,17 +41,27 @@ class Connection implements ProtocolActions final private Handler<ByteBuffer> input; final private Handler<ByteBuffer> output; + final private ConnectionDelegate delegate; final private Map<Integer,Channel> channels = new HashMap<Integer,Channel>(); - private StructFactory factory; - + // XXX: hardcoded versions + private ProtocolHeader header = new ProtocolHeader((byte) 1, (byte) 0, (byte) 10); // XXX private int maxFrame = 64*1024; - public Connection(Handler<ByteBuffer> output) + public Connection(Handler<ByteBuffer> output, + ConnectionDelegate delegate, + InputHandler.State state) { - this.input = new InputHandler(this); + this.input = new InputHandler(this, state); this.output = output; + this.delegate = delegate; + } + + public Connection(Handler<ByteBuffer> output, + ConnectionDelegate delegate) + { + this(output, delegate, InputHandler.State.PROTO_HDR); } public Handler<ByteBuffer> getInputHandler() @@ -64,9 +74,19 @@ class Connection implements ProtocolActions return output; } - public StructFactory getFactory() + public ProtocolHeader getHeader() + { + return header; + } + + public byte getMajor() { - return factory; + return header.getMajor(); + } + + public byte getMinor() + { + return header.getMinor(); } public int getMaxFrame() @@ -74,35 +94,31 @@ class Connection implements ProtocolActions return maxFrame; } - public void init(ProtocolHeader header) + public void init(ProtocolHeader hdr) { System.out.println(header); - // XXX: hardcoded versions - if (header.getMajor() != 0 && header.getMinor() != 10) + if (hdr.getMajor() != header.getMajor() && + hdr.getMinor() != header.getMinor()) { - ByteBuffer buf = ByteBuffer.allocate(8); - buf.put("AMQP".getBytes()); - buf.put((byte) 1); - buf.put((byte) 1); - buf.put((byte) 0); - buf.put((byte) 10); - buf.flip(); - output.handle(buf); + output.handle(header.toByteBuffer()); // XXX: how do we close the connection? - } else { - factory = new StructFactory_v0_10(); } } - public void frame(Frame frame) + public Channel getChannel(int number) { - Channel channel = channels.get(frame.getChannel()); + Channel channel = channels.get(number); if (channel == null) { - channel = new Channel(this, frame.getChannel()); - channels.put(frame.getChannel(), channel); + channel = new Channel(this, number, delegate.getSessionDelegate()); + channels.put(number, channel); } + return channel; + } + public void frame(Frame frame) + { + Channel channel = getChannel(frame.getChannel()); channel.handle(frame); } diff --git a/java/common/src/main/java/org/apache/qpidity/Writable.java b/java/common/src/main/java/org/apache/qpidity/ConnectionDelegate.java index 9554da880a..9df264561c 100644 --- a/java/common/src/main/java/org/apache/qpidity/Writable.java +++ b/java/common/src/main/java/org/apache/qpidity/ConnectionDelegate.java @@ -20,15 +20,16 @@ */ package org.apache.qpidity; + /** - * Writable + * ConnectionDelegate * * @author Rafael H. Schloming */ -interface Writable +public interface ConnectionDelegate { - void write(Encoder enc); + SessionDelegate getSessionDelegate(); } diff --git a/java/common/src/main/java/org/apache/qpidity/ContentHandler.java b/java/common/src/main/java/org/apache/qpidity/ContentHandler.java index 241d265bc4..435e377c2f 100644 --- a/java/common/src/main/java/org/apache/qpidity/ContentHandler.java +++ b/java/common/src/main/java/org/apache/qpidity/ContentHandler.java @@ -29,15 +29,17 @@ package org.apache.qpidity; * @author Rafael H. Schloming */ -class ContentHandler<C> extends TypeSwitch<C> +class ContentHandler extends TypeSwitch<Session> { - public ContentHandler(StructFactory factory, DelegateResolver<C> resolver) + public ContentHandler(byte major, byte minor, SessionDelegate delegate) { - MethodDispatcher<C> md = new MethodDispatcher<C>(factory, resolver); - map(Frame.METHOD, new SegmentAssembler<C>(md)); - map(Frame.HEADER, new SegmentAssembler<C>(new HeaderHandler<C>())); - map(Frame.BODY, new BodyHandler<C>()); + MethodDispatcher<Session> md = + new MethodDispatcher<Session>(major, minor, delegate); + map(Frame.METHOD, new SegmentAssembler<Session>(md)); + map(Frame.HEADER, new SegmentAssembler<Session> + (new HeaderHandler(major, minor, delegate))); + map(Frame.BODY, new BodyHandler(delegate)); } } diff --git a/java/common/src/main/java/org/apache/qpidity/DelegateResolver.java b/java/common/src/main/java/org/apache/qpidity/DelegateResolver.java deleted file mode 100644 index 632781847e..0000000000 --- a/java/common/src/main/java/org/apache/qpidity/DelegateResolver.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpidity; - - -/** - * A DelegateResolver locates the desired delegate to use for handling - * a given Struct. - * - * @author Rafael H. Schloming - */ - -interface DelegateResolver<C> -{ - - Delegate<C> resolve(Struct struct); - -} diff --git a/java/common/src/main/java/org/apache/qpidity/AbstractStruct.java b/java/common/src/main/java/org/apache/qpidity/Encodable.java index 6e70fe3db4..6a734a1791 100644 --- a/java/common/src/main/java/org/apache/qpidity/AbstractStruct.java +++ b/java/common/src/main/java/org/apache/qpidity/Encodable.java @@ -22,9 +22,16 @@ package org.apache.qpidity; /** - * AbstractStruct + * Encodable * * @author Rafael H. Schloming */ -public abstract class AbstractStruct implements Struct {} +public interface Encodable +{ + + void write(Encoder enc, byte major, byte minor); + + void read(Decoder dec, byte major, byte minor); + +} diff --git a/java/common/src/main/java/org/apache/qpidity/FragmentDecoder.java b/java/common/src/main/java/org/apache/qpidity/FragmentDecoder.java index 64ee4ef6ce..b389d54390 100644 --- a/java/common/src/main/java/org/apache/qpidity/FragmentDecoder.java +++ b/java/common/src/main/java/org/apache/qpidity/FragmentDecoder.java @@ -40,23 +40,42 @@ class FragmentDecoder extends AbstractDecoder private final Iterator<ByteBuffer> fragments; private ByteBuffer current; - public FragmentDecoder(StructFactory factory, Iterator<ByteBuffer> fragments) + public FragmentDecoder(byte major, byte minor, Iterator<ByteBuffer> fragments) { - super(factory); + super(major, minor); this.fragments = fragments; this.current = null; } - private void preRead() + public boolean hasRemaining() { - if (current == null) + advance(); + return current != null || fragments.hasNext(); + } + + private void advance() + { + while (current == null && fragments.hasNext()) { - if (!fragments.hasNext()) + current = fragments.next(); + if (current.hasRemaining()) + { + break; + } + else { - throw new BufferUnderflowException(); + current = null; } + } + } - current = fragments.next(); + private void preRead() + { + advance(); + + if (current == null) + { + throw new BufferUnderflowException(); } } diff --git a/java/common/src/main/java/org/apache/qpidity/Frame.java b/java/common/src/main/java/org/apache/qpidity/Frame.java index 1c8bb64fcf..d5076e0ef0 100644 --- a/java/common/src/main/java/org/apache/qpidity/Frame.java +++ b/java/common/src/main/java/org/apache/qpidity/Frame.java @@ -51,6 +51,8 @@ class Frame implements Iterable<ByteBuffer> public static final byte RESERVED = 0x0; + public static final byte VERSION = 0x0; + public static final byte FIRST_SEG = 0x8; public static final byte LAST_SEG = 0x4; public static final byte FIRST_FRAME = 0x2; diff --git a/java/common/src/main/java/org/apache/qpidity/Header.java b/java/common/src/main/java/org/apache/qpidity/Header.java index 57e04cacdf..9b6373df19 100644 --- a/java/common/src/main/java/org/apache/qpidity/Header.java +++ b/java/common/src/main/java/org/apache/qpidity/Header.java @@ -27,4 +27,4 @@ package org.apache.qpidity; * @author Rafael H. Schloming */ -public interface Header extends Struct {} +public abstract class Header extends Struct {} diff --git a/java/common/src/main/java/org/apache/qpidity/HeaderHandler.java b/java/common/src/main/java/org/apache/qpidity/HeaderHandler.java index 8a407cd9f6..1855681756 100644 --- a/java/common/src/main/java/org/apache/qpidity/HeaderHandler.java +++ b/java/common/src/main/java/org/apache/qpidity/HeaderHandler.java @@ -20,6 +20,11 @@ */ package org.apache.qpidity; +import java.nio.ByteBuffer; + +import java.util.ArrayList; +import java.util.Iterator; + /** * HeaderHandler @@ -27,13 +32,33 @@ package org.apache.qpidity; * @author Rafael H. Schloming */ -class HeaderHandler<C> implements Handler<Event<C,Segment>> +class HeaderHandler implements Handler<Event<Session,Segment>> { - public void handle(Event<C,Segment> event) + private static final Struct[] EMPTY_STRUCT_ARRAY = {}; + + private final byte major; + private final byte minor; + private final SessionDelegate delegate; + + public HeaderHandler(byte major, byte minor, SessionDelegate delegate) + { + this.major = major; + this.minor = minor; + this.delegate = delegate; + } + + public void handle(Event<Session,Segment> event) { System.out.println("got header segment:\n " + event.target); - + Iterator<ByteBuffer> fragments = event.target.getFragments(); + FragmentDecoder dec = new FragmentDecoder(major, minor, fragments); + ArrayList<Struct> headers = new ArrayList(); + while (dec.hasRemaining()) + { + headers.add(dec.readLongStruct()); + } + delegate.headers(event.context, headers.toArray(EMPTY_STRUCT_ARRAY)); } } diff --git a/java/common/src/main/java/org/apache/qpidity/InputHandler.java b/java/common/src/main/java/org/apache/qpidity/InputHandler.java index 5cd4fe87c1..78cb3d1b60 100644 --- a/java/common/src/main/java/org/apache/qpidity/InputHandler.java +++ b/java/common/src/main/java/org/apache/qpidity/InputHandler.java @@ -34,7 +34,7 @@ import static org.apache.qpidity.InputHandler.State.*; class InputHandler implements Handler<ByteBuffer> { - enum State + public enum State { PROTO_HDR, PROTO_HDR_M, diff --git a/java/common/src/main/java/org/apache/qpidity/Method.java b/java/common/src/main/java/org/apache/qpidity/Method.java index ab8825743c..fb269dfb7b 100644 --- a/java/common/src/main/java/org/apache/qpidity/Method.java +++ b/java/common/src/main/java/org/apache/qpidity/Method.java @@ -27,11 +27,18 @@ package org.apache.qpidity; * @author Rafael H. Schloming */ -interface Method extends Struct +public abstract class Method extends Struct { - byte getEncodedTrack(); + public abstract boolean hasPayload(); - byte getSegmentType(); + public abstract byte getEncodedTrack(); + + // XXX: do we need a segment base type? + public byte getSegmentType() + { + // XXX + return Frame.METHOD; + } } diff --git a/java/common/src/main/java/org/apache/qpidity/MethodDispatcher.java b/java/common/src/main/java/org/apache/qpidity/MethodDispatcher.java index b4545705ea..6c7389b02d 100644 --- a/java/common/src/main/java/org/apache/qpidity/MethodDispatcher.java +++ b/java/common/src/main/java/org/apache/qpidity/MethodDispatcher.java @@ -34,23 +34,29 @@ import java.util.Iterator; class MethodDispatcher<C> implements Handler<Event<C,Segment>> { - final private StructFactory factory; - final private DelegateResolver<C> resolver; + final private byte major; + final private byte minor; + final private Delegate<C> delegate; + // XXX: should be on session + private int count = 0; - public MethodDispatcher(StructFactory factory, DelegateResolver<C> resolver) + public MethodDispatcher(byte major, byte minor, Delegate<C> delegate) { - this.factory = factory; - this.resolver = resolver; + this.major = major; + this.minor = minor; + this.delegate = delegate; } public void handle(Event<C,Segment> event) { System.out.println("got method segment:\n " + event.target); Iterator<ByteBuffer> fragments = event.target.getFragments(); - Decoder dec = new FragmentDecoder(factory, fragments); + Decoder dec = new FragmentDecoder(major, minor, fragments); int type = (int) dec.readLong(); - Struct struct = factory.create(type, dec); - Delegate<C> delegate = resolver.resolve(struct); + Struct struct = Struct.create(type); + struct.setId(count++); + struct.read(dec, major, minor); + System.out.println("delegating " + struct + "[" + struct.getId() + "] to " + delegate); struct.delegate(event.context, delegate); } diff --git a/java/common/src/main/java/org/apache/qpidity/MethodHandler.java b/java/common/src/main/java/org/apache/qpidity/MethodHandler.java index 8f43f4c06f..dd952fe2c2 100644 --- a/java/common/src/main/java/org/apache/qpidity/MethodHandler.java +++ b/java/common/src/main/java/org/apache/qpidity/MethodHandler.java @@ -32,9 +32,9 @@ package org.apache.qpidity; class MethodHandler<C> extends TypeSwitch<C> { - public MethodHandler(StructFactory factory, DelegateResolver<C> resolver) + public MethodHandler(byte major, byte minor, Delegate<C> delegate) { - MethodDispatcher md = new MethodDispatcher<C>(factory, resolver); + MethodDispatcher md = new MethodDispatcher<C>(major, minor, delegate); map(Frame.METHOD, new SegmentAssembler<C>(md)); } diff --git a/java/common/src/main/java/org/apache/qpidity/MinaHandler.java b/java/common/src/main/java/org/apache/qpidity/MinaHandler.java index b54f53dd80..17bbe5c0a7 100644 --- a/java/common/src/main/java/org/apache/qpidity/MinaHandler.java +++ b/java/common/src/main/java/org/apache/qpidity/MinaHandler.java @@ -22,6 +22,7 @@ package org.apache.qpidity; import java.io.IOException; import java.net.InetSocketAddress; +import java.net.SocketAddress; import org.apache.mina.common.ByteBuffer; import org.apache.mina.common.ConnectFuture; @@ -45,6 +46,15 @@ import org.apache.mina.transport.socket.nio.SocketConnector; class MinaHandler implements IoHandler { + private final ConnectionDelegate delegate; + private final InputHandler.State state; + + public MinaHandler(ConnectionDelegate delegate, InputHandler.State state) + { + this.delegate = delegate; + this.state = state; + } + public void messageReceived(IoSession ssn, Object obj) { Connection conn = (Connection) ssn.getAttachment(); @@ -62,21 +72,29 @@ class MinaHandler implements IoHandler e.printStackTrace(); } - public void sessionCreated(IoSession ssn) + public void sessionCreated(final IoSession ssn) { System.out.println("created " + ssn); } public void sessionOpened(final IoSession ssn) { + System.out.println("opened " + ssn); Connection conn = new Connection(new Handler<java.nio.ByteBuffer>() { public void handle(java.nio.ByteBuffer buf) { ssn.write(ByteBuffer.wrap(buf)); } - }); + }, + delegate, + state); ssn.setAttachment(conn); + // XXX + synchronized (ssn) + { + ssn.notifyAll(); + } } public void sessionClosed(IoSession ssn) @@ -94,30 +112,48 @@ class MinaHandler implements IoHandler { ByteBuffer.setAllocator(new SimpleByteBufferAllocator()); if (args[0].equals("accept")) { - accept(args); + accept("0.0.0.0", 5672, SessionDelegateStub.source()); } else if (args[0].equals("connect")) { - connect(args); + connect("0.0.0.0", 5672, SessionDelegateStub.source()); } } - public static final void accept(String[] args) throws IOException + public static final void accept(String host, int port, + ConnectionDelegate delegate) + throws IOException { IoAcceptor acceptor = new SocketAcceptor(); - acceptor.bind(new InetSocketAddress("0.0.0.0", 5672), new MinaHandler()); + acceptor.bind(new InetSocketAddress(host, port), + new MinaHandler(delegate, InputHandler.State.PROTO_HDR)); } - public static final void connect(String[] args) + public static final Connection connect(String host, int port, + ConnectionDelegate delegate) { + MinaHandler handler = new MinaHandler(delegate, + InputHandler.State.FRAME_HDR); + SocketAddress addr = new InetSocketAddress(host, port); IoConnector connector = new SocketConnector(); - ConnectFuture cf = connector.connect(new InetSocketAddress("0.0.0.0", 5672), new MinaHandler()); + ConnectFuture cf = connector.connect(addr, handler); cf.join(); IoSession ssn = cf.getSession(); - ByteBuffer bb = ByteBuffer.allocate(1024); - bb.put("AMQP".getBytes()); - bb.flip(); - for (int i = 0; i < 10; i++) { - ssn.write(bb); + // XXX + synchronized (ssn) + { + while (ssn.getAttachment() == null) + { + try + { + ssn.wait(); + } + catch (InterruptedException e) + { + throw new RuntimeException(e); + } + } } + Connection conn = (Connection) ssn.getAttachment(); + return conn; } } diff --git a/java/common/src/main/java/org/apache/qpidity/ProtocolHeader.java b/java/common/src/main/java/org/apache/qpidity/ProtocolHeader.java index 1f12e285e4..5e397ae574 100644 --- a/java/common/src/main/java/org/apache/qpidity/ProtocolHeader.java +++ b/java/common/src/main/java/org/apache/qpidity/ProtocolHeader.java @@ -20,6 +20,8 @@ */ package org.apache.qpidity; +import java.nio.ByteBuffer; + /** * ProtocolHeader @@ -30,6 +32,8 @@ package org.apache.qpidity; class ProtocolHeader { + private static final byte[] AMQP = {'A', 'M', 'Q', 'P' }; + final private byte instance; final private byte major; final private byte minor; @@ -56,6 +60,18 @@ class ProtocolHeader return minor; } + public ByteBuffer toByteBuffer() + { + ByteBuffer buf = ByteBuffer.allocate(8); + buf.put(AMQP); + buf.put((byte) 1); + buf.put((byte) 1); + buf.put((byte) 0); + buf.put((byte) 10); + buf.flip(); + return buf; + } + public String toString() { return String.format("AMQP.%d %d-%d", instance, major, minor); diff --git a/java/common/src/main/java/org/apache/qpidity/SegmentEncoder.java b/java/common/src/main/java/org/apache/qpidity/SegmentEncoder.java index 7d4e5aabe4..25e29bdcf3 100644 --- a/java/common/src/main/java/org/apache/qpidity/SegmentEncoder.java +++ b/java/common/src/main/java/org/apache/qpidity/SegmentEncoder.java @@ -49,13 +49,11 @@ class SegmentEncoder extends AbstractEncoder private ByteBuffer frame; private boolean first; - public SegmentEncoder(Handler<ByteBuffer> handler, int max, - byte flags, - byte track, - byte type, - int channel, - int remaining) + public SegmentEncoder(byte major, byte minor, Handler<ByteBuffer> handler, + int max, byte flags, byte track, byte type, + int channel, int remaining) { + super(major, minor); if (max < HEADER_SIZE + 1) { throw new IllegalArgumentException @@ -115,7 +113,7 @@ class SegmentEncoder extends AbstractEncoder } } - @Override protected void put(byte b) + @Override public void put(byte b) { preWrite(); frame.put(b); @@ -123,7 +121,7 @@ class SegmentEncoder extends AbstractEncoder postWrite(); } - @Override protected void put(ByteBuffer src) + @Override public void put(ByteBuffer src) { if (src.remaining() > remaining) { @@ -147,7 +145,8 @@ class SegmentEncoder extends AbstractEncoder buf.put("AMQP_PROTOCOL_HEADER".getBytes()); buf.flip(); - SegmentEncoder enc = new SegmentEncoder(new Handler<ByteBuffer>() + SegmentEncoder enc = new SegmentEncoder((byte) 0, (byte) 10, + new Handler<ByteBuffer>() { public void handle(ByteBuffer frame) { diff --git a/java/common/src/main/java/org/apache/qpidity/Session.java b/java/common/src/main/java/org/apache/qpidity/Session.java index 666d6463c0..b3116570e8 100644 --- a/java/common/src/main/java/org/apache/qpidity/Session.java +++ b/java/common/src/main/java/org/apache/qpidity/Session.java @@ -20,6 +20,8 @@ */ package org.apache.qpidity; +import java.nio.ByteBuffer; + import java.util.HashMap; import java.util.Map; @@ -35,80 +37,88 @@ public class Session extends Invoker // channel may be null Channel channel; - private int command_id = 0; - // XXX - final Map<Integer,Handler<Struct>> handlers = new HashMap<Integer,Handler<Struct>>(); - - public void attach(Channel channel) + // outgoing command count + private long commandsOut = 0; + // XXX: incoming command count not used + // incoming command count + private long commandsIn = 0; + private Map<Long,Method> commands = new HashMap<Long,Method>(); + private long mark = 0; + + public long getCommandsOut() { - this.channel = channel; - channel.setSession(this); + return commandsOut; } - protected void invoke(Method m) + public long getCommandsIn() { - command_id++; - channel.write(m); + return commandsIn; } - protected void invoke(Method m, Handler<Struct> handler) + public void attach(Channel channel) { - invoke(m); - handlers.put(command_id, handler); + this.channel = channel; + channel.setSession(this); } - protected StructFactory getFactory() + public Method getCommand(long id) { - return channel.getFactory(); + System.out.println(id + " " + commands); + return commands.get(id); } - // ----------------------------------------- - // Messaging Methods - // ------------------------------------------ - public void messageTransfer(String destination, Message msg, Option ... _options) throws QpidException + void complete(long lower, long upper) { - + for (long id = lower; id <= upper; id++) + { + commands.put(id, null); + } } - public void data(byte[] src) throws QpidException + void complete(long mark) { - // TODO Auto-generated method stub + complete(this.mark, mark); + this.mark = mark; } - public void endData() throws QpidException + protected void invoke(Method m) { - // TODO Auto-generated method stub + if (m.getEncodedTrack() == Frame.L4) + { + long cmd = commandsOut++; + commands.put(cmd, m); + } + channel.method(m); } - public void messageHeaders(Header... headers) throws QpidException + public void headers(Struct ... headers) { - // TODO Auto-generated method stub + channel.headers(headers); } - public void messageTransfer(String destination,Option... options) throws QpidException + public void data(ByteBuffer buf) { - // TODO Auto-generated method stub + channel.data(buf); } - public void messageAcknowledge() throws QpidException + public void data(String str) { - // TODO Auto-generated method stub + channel.data(str); } - public boolean messageAcquire() throws QpidException + public void data(byte[] bytes) { - // TODO Auto-generated method stub - return false; + channel.data(bytes); } - public void messageReject() throws QpidException + public void end() { - // TODO Auto-generated method stub + channel.end(); } - public void messageRelease() throws QpidException + protected void invoke(Method m, Handler<Struct> handler) { - // TODO Auto-generated method stub - } + throw new UnsupportedOperationException(); + } } diff --git a/java/common/src/main/java/org/apache/qpidity/SimpleDelegateResolver.java b/java/common/src/main/java/org/apache/qpidity/SessionDelegate.java index fe52f0c90b..c2fbe6ba00 100644 --- a/java/common/src/main/java/org/apache/qpidity/SimpleDelegateResolver.java +++ b/java/common/src/main/java/org/apache/qpidity/SessionDelegate.java @@ -22,24 +22,29 @@ package org.apache.qpidity; /** - * SimpleDelegateResolver + * SessionDelegate * * @author Rafael H. Schloming */ -class SimpleDelegateResolver<C> implements DelegateResolver<C> +public abstract class SessionDelegate extends Delegate<Session> { - private final Delegate<C> delegate; + public abstract void headers(Session ssn, Struct ... headers); - public SimpleDelegateResolver(Delegate<C> delegate) - { - this.delegate = delegate; - } + public abstract void data(Session ssn, Frame frame); - public Delegate<C> resolve(Struct struct) + @Override public void executionComplete(Session ssn, ExecutionComplete excmp) { - return delegate; + Range<Long>[] ranges = excmp.getRangedExecutionSet(); + if (ranges != null) + { + for (Range<Long> range : ranges) + { + ssn.complete(range.getLower(), range.getUpper()); + } + } + ssn.complete(excmp.getCumulativeExecutionMark()); } } diff --git a/java/common/src/main/java/org/apache/qpidity/SizeEncoder.java b/java/common/src/main/java/org/apache/qpidity/SizeEncoder.java index b6ecdd2818..141c501104 100644 --- a/java/common/src/main/java/org/apache/qpidity/SizeEncoder.java +++ b/java/common/src/main/java/org/apache/qpidity/SizeEncoder.java @@ -36,11 +36,12 @@ class SizeEncoder extends AbstractEncoder private int size; - public SizeEncoder() { - this(0); + public SizeEncoder(byte major, byte minor) { + this(major, minor, 0); } - public SizeEncoder(int size) { + public SizeEncoder(byte major, byte minor, int size) { + super(major, minor); this.size = size; } @@ -64,6 +65,7 @@ class SizeEncoder extends AbstractEncoder @Override public void writeShortstr(String s) { + if (s == null) { s = ""; } if (s.length() > 255) { throw new IllegalArgumentException(s); } @@ -73,6 +75,7 @@ class SizeEncoder extends AbstractEncoder @Override public void writeLongstr(String s) { + if (s == null) { s = ""; } writeLong(s.length()); size += s.length(); } diff --git a/java/common/src/main/java/org/apache/qpidity/Struct.java b/java/common/src/main/java/org/apache/qpidity/Struct.java index 63b0db2c12..16b02a72a3 100644 --- a/java/common/src/main/java/org/apache/qpidity/Struct.java +++ b/java/common/src/main/java/org/apache/qpidity/Struct.java @@ -27,9 +27,27 @@ package org.apache.qpidity; * @author Rafael H. Schloming */ -interface Struct extends Delegator, Writable +public abstract class Struct implements Delegator, Encodable { - int getEncodedType(); + public static Struct create(int type) + { + return StructFactory.create(type); + } + + // XXX: command subclass? + private long id; + + public final long getId() + { + return id; + } + + void setId(long id) + { + this.id = id; + } + + abstract int getEncodedType(); } diff --git a/java/common/src/main/java/org/apache/qpidity/Stub.java b/java/common/src/main/java/org/apache/qpidity/Stub.java index e4ac8358f3..94d5932d20 100644 --- a/java/common/src/main/java/org/apache/qpidity/Stub.java +++ b/java/common/src/main/java/org/apache/qpidity/Stub.java @@ -10,11 +10,15 @@ import static org.apache.qpidity.Option.*; public class Stub { - private static Connection conn = new Connection(new ConsoleOutput()); + private static final byte major = 0; + private static final byte minor = 10; + + private static Connection conn = new Connection(new ConsoleOutput(), + SessionDelegateStub.source()); static { - conn.init(new ProtocolHeader((byte) 1, (byte) 0, (byte) 10)); + conn.init(new ProtocolHeader((byte) 1, major, minor)); } private static void frame(byte track, byte type, boolean first, boolean last) { @@ -22,21 +26,21 @@ public class Stub { } private static void frame(byte track, byte type, boolean first, boolean last, Method m) { - SizeEncoder sizer = new SizeEncoder(); + SizeEncoder sizer = new SizeEncoder(major, minor); if (m != null) { sizer.writeLong(m.getEncodedType()); - m.write(sizer); + m.write(sizer, major, minor); sizer.flush(); } ByteBuffer buf = ByteBuffer.allocate(sizer.getSize()); if (m != null) { - Encoder enc = new BBEncoder(buf); + Encoder enc = new BBEncoder(major, minor, buf); enc.writeLong(m.getEncodedType()); - m.write(enc); + m.write(enc, major, minor); enc.flush(); } buf.flip(); - byte flags = 0; + byte flags = Frame.VERSION; if (first) { flags |= Frame.FIRST_FRAME; } if (last) { flags |= Frame.LAST_FRAME; } Frame frame = new Frame(flags, type, track, 0); @@ -45,13 +49,12 @@ public class Stub { } public static final void main(String[] args) { - StructFactory f = new StructFactory_v0_10(); - frame(Frame.L2, Frame.METHOD, true, true, f.newSessionOpen(0)); + frame(Frame.L2, Frame.METHOD, true, true, new SessionOpen(0)); frame(Frame.L4, Frame.METHOD, true, false, - f.newQueueDeclare("asdf", "alternate", null, DURABLE)); + new QueueDeclare("asdf", "alternate", null, DURABLE)); frame(Frame.L4, Frame.METHOD, false, false); frame(Frame.L3, Frame.METHOD, true, true, - f.newExchangeDeclare("exchange", "type", "alternate", null)); + new ExchangeDeclare("exchange", "type", "alternate", null)); frame(Frame.L4, Frame.METHOD, false, true); frame(Frame.L4, Frame.HEADER, true, false); frame(Frame.L4, Frame.HEADER, false, false); @@ -60,59 +63,43 @@ public class Stub { frame(Frame.L4, Frame.BODY, false, false); frame(Frame.L4, Frame.BODY, false, false); frame(Frame.L1, Frame.METHOD, true, true, - f.newExchangeDeclare("exchange", "type", "alternate", null)); + new ExchangeDeclare("exchange", "type", "alternate", null)); frame(Frame.L4, Frame.BODY, false, false); frame(Frame.L4, Frame.BODY, false, true); } } -//====: Channel and Session Delegates :=======================================// - -class ChannelDelegate extends Delegate<Channel> { +class SessionDelegateStub extends SessionDelegate { - public @Override void sessionOpen(Channel channel, SessionOpen open) { - Session ssn = new Session(); - ssn.attach(channel); - long lifetime = open.getDetachedLifetime(); - System.out.println("Session Opened lifetime = " + lifetime); - try - { - ssn.sessionAttached(UUID.randomUUID(), lifetime); - } - catch (QpidException e) + public static final ConnectionDelegate source() + { + return new ConnectionDelegate() { - throw new RuntimeException(e); - } + public SessionDelegate getSessionDelegate() + { + return new SessionDelegateStub(); + } + }; } -} - -class SessionDelegate extends Delegate<Session> { - public @Override void queueDeclare(Session session, QueueDeclare qd) { System.out.println("got a queue declare: " + qd.getQueue()); } public @Override void exchangeDeclare(Session session, ExchangeDeclare ed) { System.out.println("got an exchange declare: " + ed.getExchange() + ", " + ed.getType()); - try - { - session.queueDeclare("asdf", "alternate", null); - } - catch(QpidException e) - { - throw new RuntimeException(e); - } + session.queueDeclare("asdf", "alternate", null); } - /* - public @Override void executionResult(Session session, ExecutionResult result) { - Handler<Struct> handler = session.handlers.get(result.getCommandId()); - if (handler != null) { - handler.handle(result.getData()); - } - } - */ + public void data(Session ssn, Frame frame) + { + System.out.println("got data: " + frame); + } + + public void headers(Session ssn, Struct ... headers) + { + System.out.println("got headers: " + headers); + } } diff --git a/java/common/src/main/java/org/apache/qpidity/ToyBroker.java b/java/common/src/main/java/org/apache/qpidity/ToyBroker.java new file mode 100644 index 0000000000..534f075dbd --- /dev/null +++ b/java/common/src/main/java/org/apache/qpidity/ToyBroker.java @@ -0,0 +1,195 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpidity; + +import java.io.IOException; + +import java.nio.ByteBuffer; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Queue; + +import static org.apache.qpidity.Functions.*; + + +/** + * ToyBroker + * + * @author Rafael H. Schloming + */ + +class ToyBroker extends SessionDelegate +{ + + private Map<String,Queue<Message>> queues; + private MessageTransfer xfr = null; + private DeliveryProperties props = null; + private Struct[] headers = null; + private List<Frame> frames = null; + + public ToyBroker(Map<String,Queue<Message>> queues) + { + this.queues = queues; + } + + @Override public void queueDeclare(Session ssn, QueueDeclare qd) + { + queues.put(qd.getQueue(), new LinkedList()); + System.out.println("declared queue: " + qd.getQueue()); + } + + @Override public void messageTransfer(Session ssn, MessageTransfer xfr) + { + this.xfr = xfr; + frames = new ArrayList(); + } + + public void headers(Session ssn, Struct ... headers) + { + if (xfr == null || frames == null) + { + ssn.connectionClose(503, "no method segment", 0, 0); + // XXX: close at our end + return; + } + + for (Struct hdr : headers) + { + if (hdr instanceof DeliveryProperties) + { + props = (DeliveryProperties) hdr; + } + } + + if (props != null && !props.getDiscardUnroutable()) + { + String dest = xfr.getDestination(); + if (!queues.containsKey(dest)) + { + reject(ssn); + } + } + + this.headers = headers; + } + + public void data(Session ssn, Frame frame) + { + if (xfr == null || frames == null) + { + ssn.connectionClose(503, "no method segment", 0, 0); + // XXX: close at our end + return; + } + + frames.add(frame); + + if (frame.isLastSegment() && frame.isLastFrame()) + { + String dest = xfr.getDestination(); + Queue queue = queues.get(dest); + if (queue == null) + { + reject(ssn); + } + else + { + Message m = new Message(headers, frames); + queue.offer(m); + System.out.println("queued " + m); + } + xfr = null; + frames = null; + } + } + + private void reject(Session ssn) + { + if (props != null && props.getDiscardUnroutable()) + { + return; + } + else + { + long id = xfr.getId(); + Range[] ranges = {new Range<Long>(id, id)}; + ssn.messageReject(ranges, 0, "no such destination"); + } + } + + private class Message + { + private final Struct[] headers; + private final List<Frame> frames; + + public Message(Struct[] headers, List<Frame> frames) + { + this.headers = headers; + this.frames = frames; + } + + public String toString() + { + StringBuilder sb = new StringBuilder(); + + if (headers != null) + { + boolean first = true; + for (Struct hdr : headers) + { + if (first) { first = false; } + else { sb.append(" "); } + sb.append(hdr); + } + } + + for (Frame f : frames) + { + for (ByteBuffer b : f) + { + sb.append(" | "); + sb.append(str(b)); + } + } + + return sb.toString(); + } + + } + + public static final void main(String[] args) throws IOException + { + final Map<String,Queue<Message>> queues = + new HashMap<String,Queue<Message>>(); + MinaHandler.accept("0.0.0.0", 5672, new ConnectionDelegate() + { + public SessionDelegate getSessionDelegate() + { + return new ToyBroker(queues); + } + }); + } + +} diff --git a/java/common/src/main/java/org/apache/qpidity/ToyClient.java b/java/common/src/main/java/org/apache/qpidity/ToyClient.java new file mode 100644 index 0000000000..1e57de3265 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpidity/ToyClient.java @@ -0,0 +1,88 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpidity; + + +/** + * ToyClient + * + * @author Rafael H. Schloming + */ + +class ToyClient extends SessionDelegate +{ + + @Override public void messageReject(Session ssn, MessageReject reject) + { + for (Range<Long> range : reject.getTransfers()) + { + for (long l = range.getLower(); l <= range.getUpper(); l++) + { + System.out.println("message rejected: " + + ssn.getCommand((int) l)); + } + } + } + + public void headers(Session ssn, Struct ... headers) + { + for (Struct hdr : headers) + { + System.out.println("header: " + hdr); + } + } + + public void data(Session ssn, Frame frame) + { + System.out.println("got data: " + frame); + } + + public static final void main(String[] args) + { + Connection conn = MinaHandler.connect("0.0.0.0", 5672, + new ConnectionDelegate() + { + public SessionDelegate getSessionDelegate() + { + return new ToyClient(); + } + }); + conn.getOutputHandler().handle(conn.getHeader().toByteBuffer()); + + Channel ch = conn.getChannel(0); + Session ssn = new Session(); + ssn.attach(ch); + ssn.sessionOpen(1234); + + ssn.queueDeclare("asdf", null, null); + + ssn.messageTransfer("asdf", (short) 0, (short) 1); + ssn.headers(new DeliveryProperties(), + new MessageProperties()); + ssn.data("this is the data"); + ssn.end(); + + ssn.messageTransfer("fdsa", (short) 0, (short) 1); + ssn.data("this should be rejected"); + ssn.end(); + } + +} |
