summaryrefslogtreecommitdiff
path: root/java/common
diff options
context:
space:
mode:
Diffstat (limited to 'java/common')
-rwxr-xr-xjava/common/generate154
-rw-r--r--java/common/src/main/java/org/apache/qpidity/AbstractDecoder.java71
-rw-r--r--java/common/src/main/java/org/apache/qpidity/AbstractEncoder.java78
-rw-r--r--java/common/src/main/java/org/apache/qpidity/BBDecoder.java4
-rw-r--r--java/common/src/main/java/org/apache/qpidity/BBEncoder.java3
-rw-r--r--java/common/src/main/java/org/apache/qpidity/BodyHandler.java12
-rw-r--r--java/common/src/main/java/org/apache/qpidity/Channel.java149
-rw-r--r--java/common/src/main/java/org/apache/qpidity/ChannelDelegate.java (renamed from java/common/src/main/java/org/apache/qpidity/AbstractMethod.java)24
-rw-r--r--java/common/src/main/java/org/apache/qpidity/Connection.java62
-rw-r--r--java/common/src/main/java/org/apache/qpidity/ConnectionDelegate.java (renamed from java/common/src/main/java/org/apache/qpidity/Writable.java)7
-rw-r--r--java/common/src/main/java/org/apache/qpidity/ContentHandler.java14
-rw-r--r--java/common/src/main/java/org/apache/qpidity/DelegateResolver.java36
-rw-r--r--java/common/src/main/java/org/apache/qpidity/Encodable.java (renamed from java/common/src/main/java/org/apache/qpidity/AbstractStruct.java)11
-rw-r--r--java/common/src/main/java/org/apache/qpidity/FragmentDecoder.java33
-rw-r--r--java/common/src/main/java/org/apache/qpidity/Frame.java2
-rw-r--r--java/common/src/main/java/org/apache/qpidity/Header.java2
-rw-r--r--java/common/src/main/java/org/apache/qpidity/HeaderHandler.java31
-rw-r--r--java/common/src/main/java/org/apache/qpidity/InputHandler.java2
-rw-r--r--java/common/src/main/java/org/apache/qpidity/Method.java13
-rw-r--r--java/common/src/main/java/org/apache/qpidity/MethodDispatcher.java22
-rw-r--r--java/common/src/main/java/org/apache/qpidity/MethodHandler.java4
-rw-r--r--java/common/src/main/java/org/apache/qpidity/MinaHandler.java62
-rw-r--r--java/common/src/main/java/org/apache/qpidity/ProtocolHeader.java16
-rw-r--r--java/common/src/main/java/org/apache/qpidity/SegmentEncoder.java17
-rw-r--r--java/common/src/main/java/org/apache/qpidity/Session.java86
-rw-r--r--java/common/src/main/java/org/apache/qpidity/SessionDelegate.java (renamed from java/common/src/main/java/org/apache/qpidity/SimpleDelegateResolver.java)23
-rw-r--r--java/common/src/main/java/org/apache/qpidity/SizeEncoder.java9
-rw-r--r--java/common/src/main/java/org/apache/qpidity/Struct.java22
-rw-r--r--java/common/src/main/java/org/apache/qpidity/Stub.java81
-rw-r--r--java/common/src/main/java/org/apache/qpidity/ToyBroker.java195
-rw-r--r--java/common/src/main/java/org/apache/qpidity/ToyClient.java88
31 files changed, 962 insertions, 371 deletions
diff --git a/java/common/generate b/java/common/generate
index 7ed52a23e5..ea805b62f2 100755
--- a/java/common/generate
+++ b/java/common/generate
@@ -9,9 +9,6 @@ out_dir=sys.argv[1]
out_pkg = sys.argv[2]
spec_file = sys.argv[3]
spec = mllib.xml_parse(spec_file)
-major = spec["amqp/@major"]
-minor = spec["amqp/@minor"]
-isfx = "_v%s_%s" % (major, minor)
class Output:
@@ -58,6 +55,13 @@ TYPES = {
"long-struct": "Struct"
}
+TRACKS = {
+ "connection": "Frame.L1",
+ "session": "Frame.L2",
+ "execution": "Frame.L3",
+ None: None
+ }
+
def camel(offset, *args):
parts = []
for a in args:
@@ -99,29 +103,23 @@ OPTIONS = {}
class Struct:
- def __init__(self, name, base, type):
+ def __init__(self, name, base, type, track, content):
self.name = name
self.base = base
self.type = type
+ self.track = track
+ self.content = content
self.fields = []
def field(self, type, name):
self.fields.append((type, name))
- def interface(self, out):
- out.line("public interface %s extends %s {" % (self.name, self.base))
- out.line()
+ def impl(self, out):
+ out.line("public class %s extends %s {" % (self.name, self.base))
+
if self.type != None:
+ out.line()
out.line(" public static final int TYPE = %d;" % self.type)
- out.line()
- for type, name in self.fields:
- out.line(" %s %s();" % (jtype(type), camel(1, "get", name)))
- out.line()
- out.line("}")
-
- def impl(self, out):
- out.line("class %s%s extends Abstract%s implements %s {" %
- (self.name, isfx, self.base, self.name))
out.line()
out.line(" public int getEncodedType() {")
@@ -131,23 +129,30 @@ class Struct:
out.line(" return TYPE;")
out.line(" }")
- out.line()
- for type, name in self.fields:
- out.line(" private final %s %s;" % (jtype(type), name))
+ if self.base == "Method":
+ out.line()
+ out.line(" public boolean hasPayload() {")
+ if self.content:
+ out.line(" return true;")
+ else:
+ out.line(" return false;")
+ out.line(" }")
+
+ out.line()
+ out.line(" public byte getEncodedTrack() {")
+ out.line(" return %s;" % self.track)
+ out.line(" }")
out.line()
- out.line(" %s%s(Decoder dec) {" % (self.name, isfx))
for type, name in self.fields:
- if TYPES.has_key(type):
- out.line(" %s = dec.read%s();" % (name, camel(0, type)))
- elif STRUCTS.has_key(type):
- out.line(" %s = new %s%s(dec);" % (name, STRUCTS[type], isfx))
- else:
- raise Exception("unknown type: %s" % type)
- out.line(" }")
+ out.line(" private %s %s;" % (jtype(type), name))
+
+ if self.fields:
+ out.line()
+ out.line(" public %s() {}" % self.name)
out.line()
- out.line(" %s%s(%s) {" % (self.name, isfx, self.parameters()))
+ out.line(" public %s(%s) {" % (self.name, self.parameters()))
opts = False
for type, name in self.fields:
if not OPTIONS.has_key(name):
@@ -163,6 +168,7 @@ class Struct:
for type, name in self.fields:
if OPTIONS.has_key(name):
out.line(" case %s: _%s=true; break;" % (OPTIONS[name], name))
+ out.line(" case NO_OPTION: break;")
out.line(' default: throw new IllegalArgumentException'
'("invalid option: " + _options[i]);')
out.line(" }")
@@ -177,19 +183,44 @@ class Struct:
out.line(" delegate.%s(context, this);" % dromedary(self.name))
out.line(" }")
- out.line()
for type, name in self.fields:
+ out.line()
out.line(" public %s %s() {" % (jtype(type), camel(1, "get", name)))
out.line(" return %s;" % name)
out.line(" }")
+ out.line(" public %s %s(%s value) {" %
+ (self.name, camel(1, "set", name), jtype(type)))
+ out.line(" this.%s = value;" % name);
+ out.line(" return this;")
+ out.line(" }")
+ out.line(" public %s %s(%s value) {" % (self.name, name, jtype(type)))
+ out.line(" this.%s = value;" % name);
+ out.line(" return this;")
+ out.line(" }")
+
+ out.line()
+ out.line(" public void read(Decoder dec, byte major, byte minor) {")
+ for type, name in self.fields:
+ if TYPES.has_key(type):
+ out.line(" %s = dec.read%s();" % (name, camel(0, type)))
+ elif STRUCTS.has_key(type):
+ out.line(" %s = new %s();" % (name, STRUCTS[type]))
+ out.line(" %s.read(dec, major, minor);" % name)
+ else:
+ raise Exception("unknown type: %s" % type)
+ out.line(" }")
out.line()
- out.line(" public void write(Encoder enc) {")
+ out.line(" public void write(Encoder enc, byte major, byte minor) {")
for type, name in self.fields:
if TYPES.has_key(type):
out.line(" enc.write%s(%s);" % (camel(0, type), name))
elif STRUCTS.has_key(type):
- out.line(" %s.write(enc);" % name)
+ out.line(" if (%s == null) {" % name)
+ out.line(" new %s().write(enc, major, minor);" % jtype(type))
+ out.line(" } else {")
+ out.line(" %s.write(enc, major, minor);" % name)
+ out.line(" }")
else:
raise Exception("unknown type: %s" % type)
out.line(" }")
@@ -253,7 +284,9 @@ opts = Output(out_dir, out_pkg, "Option")
opts.line("public enum Option {")
structs = []
for name, base, typecode, m in v.structs:
- struct = Struct(name, base, typecode)
+ struct = Struct(name, base, typecode,
+ TRACKS.get(m.parent["@name"], "Frame.L4"),
+ m["@content"] == "1")
for f in m.query["field", lambda f: FIELDS.get(f["@name"], True)]:
type = resolve(f["@domain"])
name = camel(1, f["@name"])
@@ -269,48 +302,24 @@ opts.line("}")
opts.write()
for s in structs:
- out = Output(out_dir, out_pkg, s.name)
- s.interface(out)
- out.write()
- iout = Output(out_dir, out_pkg, s.name + isfx)
- s.impl(iout)
- iout.write()
+ impl = Output(out_dir, out_pkg, s.name)
+ s.impl(impl)
+ impl.write()
fct = Output(out_dir, out_pkg, "StructFactory")
-fct.line("public interface StructFactory {")
-fct.line(" Struct create(int type, Decoder dec);")
-for s in structs:
- fct.line()
- fct.line(" %s new%s(Decoder dec);" % (s.name, s.name))
- fct.line(" %s new%s(%s);" % (s.name, s.name, s.parameters()))
-fct.line("}")
-fct.write()
-
-ifct_name = "StructFactory%s" % isfx
-ifct = Output(out_dir, out_pkg, ifct_name)
-ifct.line("class %s implements StructFactory {" % ifct_name)
-ifct.line(" public Struct create(int type, Decoder dec) {")
-ifct.line(" switch (type) {")
+fct.line("class StructFactory {")
+fct.line(" public static Struct create(int type) {")
+fct.line(" switch (type) {")
for s in structs:
if s.type == None: continue
- ifct.line(" case %s.TYPE:" % s.name)
- ifct.line(" return new %s%s(dec);" % (s.name, isfx))
-ifct.line(" default:")
-ifct.line(' throw new IllegalArgumentException("type: " + type);')
-ifct.line(" }")
-ifct.line(" }")
-
-for s in structs:
- ifct.line(" public %s new%s(Decoder dec) {" % (s.name, s.name))
- ifct.line(" return new %s%s(dec);" % (s.name, isfx))
- ifct.line(" }")
-
- ifct.line(" public %s new%s(%s) {" % (s.name, s.name, s.parameters()))
- ifct.line(" return new %s%s(%s);" % (s.name, isfx, s.arguments()))
- ifct.line(" }")
-
-ifct.line("}");
-ifct.write()
+ fct.line(" case %s.TYPE:" % s.name)
+ fct.line(" return new %s();" % s.name)
+fct.line(" default:")
+fct.line(' throw new IllegalArgumentException("type: " + type);')
+fct.line(" }")
+fct.line(" }")
+fct.line("}");
+fct.write()
dlg = Output(out_dir, out_pkg, "Delegate")
dlg.line("public abstract class Delegate<C> {")
@@ -325,13 +334,12 @@ inv.line("public abstract class Invoker {")
inv.line()
inv.line(" protected abstract void invoke(Method method);")
inv.line(" protected abstract void invoke(Method method, Handler<Struct> handler);")
-inv.line(" protected abstract StructFactory getFactory();")
inv.line()
for s in structs:
if s.base != "Method": continue
dname = dromedary(s.name)
- inv.line(" public void %s(%s) throws QpidException {" % (dname, s.parameters()))
- inv.line(" invoke(getFactory().new%s(%s));" % (s.name, s.arguments()))
+ inv.line(" public void %s(%s) {" % (dname, s.parameters()))
+ inv.line(" invoke(new %s(%s));" % (s.name, s.arguments()))
inv.line(" }")
inv.line("}")
inv.write()
diff --git a/java/common/src/main/java/org/apache/qpidity/AbstractDecoder.java b/java/common/src/main/java/org/apache/qpidity/AbstractDecoder.java
index ee350ac343..9db419537c 100644
--- a/java/common/src/main/java/org/apache/qpidity/AbstractDecoder.java
+++ b/java/common/src/main/java/org/apache/qpidity/AbstractDecoder.java
@@ -35,17 +35,24 @@ import static org.apache.qpidity.Functions.*;
abstract class AbstractDecoder implements Decoder
{
- private final StructFactory factory;
+ private final byte major;
+ private final byte minor;
- protected AbstractDecoder(StructFactory factory)
+ protected AbstractDecoder(byte major, byte minor)
{
- this.factory = factory;
+ this.major = major;
+ this.minor = minor;
}
protected abstract byte get();
protected abstract void get(byte[] bytes);
+ protected short uget()
+ {
+ return unsigned(get());
+ }
+
private byte bits = 0x0;
private byte nbits = 0;
@@ -72,38 +79,38 @@ abstract class AbstractDecoder implements Decoder
public short readOctet()
{
clearBits();
- return unsigned(get());
+ return uget();
}
public int readShort()
{
clearBits();
- int i = get() << 8;
- i |= get();
+ int i = uget() << 8;
+ i |= uget();
return i;
}
public long readLong()
{
clearBits();
- long l = get() << 24;
- l |= get() << 16;
- l |= get() << 8;
- l |= get();
+ long l = uget() << 24;
+ l |= uget() << 16;
+ l |= uget() << 8;
+ l |= uget();
return l;
}
public long readLonglong()
{
clearBits();
- long l = get() << 56;
- l |= get() << 48;
- l |= get() << 40;
- l |= get() << 32;
- l |= get() << 24;
- l |= get() << 16;
- l |= get() << 8;
- l |= get();
+ long l = uget() << 56;
+ l |= uget() << 48;
+ l |= uget() << 40;
+ l |= uget() << 32;
+ l |= uget() << 24;
+ l |= uget() << 16;
+ l |= uget() << 8;
+ l |= uget();
return l;
}
@@ -138,7 +145,20 @@ abstract class AbstractDecoder implements Decoder
public Range<Long>[] readRfc1982LongSet()
{
- throw new Error("TODO");
+ int count = readShort()/8;
+ if (count == 0)
+ {
+ return null;
+ }
+ else
+ {
+ Range<Long>[] ranges = new Range[count];
+ for (int i = 0; i < count; i++)
+ {
+ ranges[i] = new Range<Long>(readLong(), readLong());
+ }
+ return ranges;
+ }
}
public UUID readUuid()
@@ -156,8 +176,17 @@ abstract class AbstractDecoder implements Decoder
public Struct readLongStruct()
{
long size = readLong();
- int type = readShort();
- return factory.create(type, this);
+ if (size == 0)
+ {
+ return null;
+ }
+ else
+ {
+ int type = readShort();
+ Struct result = Struct.create(type);
+ result.read(this, major, minor);
+ return result;
+ }
}
}
diff --git a/java/common/src/main/java/org/apache/qpidity/AbstractEncoder.java b/java/common/src/main/java/org/apache/qpidity/AbstractEncoder.java
index deca409788..f898d759d5 100644
--- a/java/common/src/main/java/org/apache/qpidity/AbstractEncoder.java
+++ b/java/common/src/main/java/org/apache/qpidity/AbstractEncoder.java
@@ -37,6 +37,15 @@ import static org.apache.qpidity.Functions.*;
abstract class AbstractEncoder implements Encoder
{
+ private final byte major;
+ private final byte minor;
+
+ protected AbstractEncoder(byte major, byte minor)
+ {
+ this.major = major;
+ this.minor = minor;
+ }
+
protected abstract void put(byte b);
protected abstract void put(ByteBuffer src);
@@ -82,7 +91,7 @@ abstract class AbstractEncoder implements Encoder
assert s < 0x10000;
flushBits();
- put(lsb(s >> 8));
+ put(lsb(s >>> 8));
put(lsb(s));
}
@@ -91,22 +100,22 @@ abstract class AbstractEncoder implements Encoder
assert i < 0x100000000L;
flushBits();
- put(lsb(i >> 24));
- put(lsb(i >> 16));
- put(lsb(i >> 8));
+ put(lsb(i >>> 24));
+ put(lsb(i >>> 16));
+ put(lsb(i >>> 8));
put(lsb(i));
}
public void writeLonglong(long l)
{
flushBits();
- put(lsb(l >> 56));
- put(lsb(l >> 48));
- put(lsb(l >> 40));
- put(lsb(l >> 32));
- put(lsb(l >> 24));
- put(lsb(l >> 16));
- put(lsb(l >> 8));
+ put(lsb(l >>> 56));
+ put(lsb(l >>> 48));
+ put(lsb(l >>> 40));
+ put(lsb(l >>> 32));
+ put(lsb(l >>> 24));
+ put(lsb(l >>> 16));
+ put(lsb(l >>> 8));
put(lsb(l));
}
@@ -120,6 +129,7 @@ abstract class AbstractEncoder implements Encoder
public void writeShortstr(String s)
{
+ if (s == null) { s = ""; }
if (s.length() > 255) {
throw new IllegalArgumentException(s);
}
@@ -129,6 +139,7 @@ abstract class AbstractEncoder implements Encoder
public void writeLongstr(String s)
{
+ if (s == null) { s = ""; }
writeLong(s.length());
put(ByteBuffer.wrap(s.getBytes()));
}
@@ -141,13 +152,32 @@ abstract class AbstractEncoder implements Encoder
public void writeRfc1982LongSet(Range<Long>[] ranges)
{
- throw new Error("TODO");
+ if (ranges == null)
+ {
+ writeShort((short) 0);
+ }
+ else
+ {
+ writeShort(ranges.length * 8);
+ for (Range<Long> range : ranges)
+ {
+ writeLong(range.getLower());
+ writeLong(range.getUpper());
+ }
+ }
}
public void writeUuid(UUID uuid)
{
- writeLong(uuid.getMostSignificantBits());
- writeLong(uuid.getLeastSignificantBits());
+ long msb = 0;
+ long lsb = 0;
+ if (uuid != null)
+ {
+ msb = uuid.getMostSignificantBits();
+ uuid.getLeastSignificantBits();
+ }
+ writeLong(msb);
+ writeLong(lsb);
}
public void writeContent(String c)
@@ -157,12 +187,20 @@ abstract class AbstractEncoder implements Encoder
public void writeLongStruct(Struct s)
{
- SizeEncoder sizer = new SizeEncoder();
- sizer.writeShort(s.getEncodedType());
- s.write(sizer);
- writeLong(sizer.getSize());
- writeShort(s.getEncodedType());
- s.write(this);
+ if (s == null)
+ {
+ writeLong(0);
+ }
+ else
+ {
+ SizeEncoder sizer = new SizeEncoder(major, minor);
+ sizer.writeShort(s.getEncodedType());
+ s.write(sizer, major, minor);
+
+ writeLong(sizer.getSize());
+ writeShort(s.getEncodedType());
+ s.write(this, major, minor);
+ }
}
public void flush()
diff --git a/java/common/src/main/java/org/apache/qpidity/BBDecoder.java b/java/common/src/main/java/org/apache/qpidity/BBDecoder.java
index 49255de4be..afbb0eddab 100644
--- a/java/common/src/main/java/org/apache/qpidity/BBDecoder.java
+++ b/java/common/src/main/java/org/apache/qpidity/BBDecoder.java
@@ -34,9 +34,9 @@ class BBDecoder extends AbstractDecoder
private final ByteBuffer in;
- public BBDecoder(StructFactory factory, ByteBuffer in)
+ public BBDecoder(byte major, byte minor, ByteBuffer in)
{
- super(factory);
+ super(major, minor);
this.in = in;
}
diff --git a/java/common/src/main/java/org/apache/qpidity/BBEncoder.java b/java/common/src/main/java/org/apache/qpidity/BBEncoder.java
index 947a461ab3..3e0a0bb2ba 100644
--- a/java/common/src/main/java/org/apache/qpidity/BBEncoder.java
+++ b/java/common/src/main/java/org/apache/qpidity/BBEncoder.java
@@ -34,7 +34,8 @@ class BBEncoder extends AbstractEncoder
private final ByteBuffer out;
- public BBEncoder(ByteBuffer out) {
+ public BBEncoder(byte major, byte minor, ByteBuffer out) {
+ super(major, minor);
this.out = out;
}
diff --git a/java/common/src/main/java/org/apache/qpidity/BodyHandler.java b/java/common/src/main/java/org/apache/qpidity/BodyHandler.java
index 0b772c6f04..26e654d3a7 100644
--- a/java/common/src/main/java/org/apache/qpidity/BodyHandler.java
+++ b/java/common/src/main/java/org/apache/qpidity/BodyHandler.java
@@ -27,12 +27,20 @@ package org.apache.qpidity;
* @author Rafael H. Schloming
*/
-class BodyHandler<C> implements Handler<Event<C,Frame>>
+class BodyHandler implements Handler<Event<Session,Frame>>
{
- public void handle(Event<C,Frame> event)
+ private final SessionDelegate delegate;
+
+ public BodyHandler(SessionDelegate delegate)
+ {
+ this.delegate = delegate;
+ }
+
+ public void handle(Event<Session,Frame> event)
{
System.out.println("got body frame: " + event.target);
+ delegate.data(event.context, event.target);
}
}
diff --git a/java/common/src/main/java/org/apache/qpidity/Channel.java b/java/common/src/main/java/org/apache/qpidity/Channel.java
index 4f23112da3..2b25d7287b 100644
--- a/java/common/src/main/java/org/apache/qpidity/Channel.java
+++ b/java/common/src/main/java/org/apache/qpidity/Channel.java
@@ -20,6 +20,14 @@
*/
package org.apache.qpidity;
+import java.nio.ByteBuffer;
+
+import java.util.List;
+import java.util.ArrayList;
+
+import static org.apache.qpidity.Frame.*;
+
+
/**
* Channel
*
@@ -32,27 +40,41 @@ class Channel extends Invoker implements Handler<Frame>
final private Connection connection;
final private int channel;
final private TrackSwitch<Channel> tracks;
+ final private Delegate<Channel> delegate;
// session may be null
private Session session;
- public Channel(Connection connection, int channel)
+ private Method method = null;
+ private List<ByteBuffer> data = null;
+ private int dataSize;
+
+ public Channel(Connection connection, int channel, SessionDelegate delegate)
{
this.connection = connection;
this.channel = channel;
-
- DelegateResolver<Channel> chDR =
- new SimpleDelegateResolver<Channel>(new ChannelDelegate());
- DelegateResolver<Session> ssnDR =
- new SimpleDelegateResolver<Session>(new SessionDelegate());
+ this.delegate = new ChannelDelegate();
tracks = new TrackSwitch<Channel>();
- tracks.map(Frame.L1, new MethodHandler<Channel>(getFactory(), chDR));
- tracks.map(Frame.L2, new MethodHandler<Channel>(getFactory(), chDR));
- tracks.map(Frame.L3, new SessionResolver<Frame>
- (new MethodHandler<Session>(getFactory(), ssnDR)));
- tracks.map(Frame.L4, new SessionResolver<Frame>
- (new ContentHandler<Session>(getFactory(), ssnDR)));
+ tracks.map(L1, new MethodHandler<Channel>
+ (getMajor(), getMinor(), this.delegate));
+ tracks.map(L2, new MethodHandler<Channel>
+ (getMajor(), getMinor(), this.delegate));
+ tracks.map(L3, new SessionResolver<Frame>
+ (new MethodHandler<Session>
+ (getMajor(), getMinor(), delegate)));
+ tracks.map(L4, new SessionResolver<Frame>
+ (new ContentHandler(getMajor(), getMinor(), delegate)));
+ }
+
+ public byte getMajor()
+ {
+ return connection.getMajor();
+ }
+
+ public byte getMinor()
+ {
+ return connection.getMinor();
}
public int getEncodedChannel() {
@@ -74,35 +96,108 @@ class Channel extends Invoker implements Handler<Frame>
tracks.handle(new Event<Channel,Frame>(this, frame));
}
- public void write(Method m)
+ private SegmentEncoder newEncoder(byte flags, byte track, byte type, int size)
{
- SizeEncoder sizer = new SizeEncoder();
+ return new SegmentEncoder(getMajor(),
+ getMinor(),
+ connection.getOutputHandler(),
+ connection.getMaxFrame(),
+ (byte) (flags | VERSION),
+ track,
+ type,
+ channel,
+ size);
+ }
+
+ public void method(Method m)
+ {
+ SizeEncoder sizer = new SizeEncoder(getMajor(), getMinor());
sizer.writeLong(m.getEncodedType());
- m.write(sizer);
+ m.write(sizer, getMajor(), getMinor());
sizer.flush();
int size = sizer.getSize();
- // XXX: need to set header flags properly
- SegmentEncoder enc = new SegmentEncoder(connection.getOutputHandler(),
- connection.getMaxFrame(),
- (byte) 0x0,
- m.getEncodedTrack(),
- m.getSegmentType(),
- channel,
- size);
+ byte flags = FIRST_SEG;
+
+ if (!m.hasPayload())
+ {
+ flags |= LAST_SEG;
+ }
+
+ SegmentEncoder enc = newEncoder(flags, m.getEncodedTrack(),
+ m.getSegmentType(), size);
enc.writeLong(m.getEncodedType());
- m.write(enc);
+ m.write(enc, getMajor(), getMinor());
enc.flush();
+
+ if (m.hasPayload())
+ {
+ method = m;
+ }
+ }
+
+ public void headers(Struct ... headers)
+ {
+ if (method == null)
+ {
+ throw new IllegalStateException("cannot write headers without method");
+ }
+
+ SizeEncoder sizer = new SizeEncoder(getMajor(), getMinor());
+ for (Struct hdr : headers)
+ {
+ sizer.writeLongStruct(hdr);
+ }
+
+ SegmentEncoder enc = newEncoder((byte) 0x0,
+ method.getEncodedTrack(),
+ HEADER,
+ sizer.getSize());
+ for (Struct hdr : headers)
+ {
+ enc.writeLongStruct(hdr);
+ enc.flush();
+ }
+ }
+
+ public void data(ByteBuffer buf)
+ {
+ if (data == null)
+ {
+ data = new ArrayList<ByteBuffer>();
+ dataSize = 0;
+ }
+ data.add(buf);
+ dataSize += buf.remaining();
}
- protected StructFactory getFactory()
+ public void data(String str)
{
- return connection.getFactory();
+ data(str.getBytes());
+ }
+
+ public void data(byte[] bytes)
+ {
+ data(ByteBuffer.wrap(bytes));
+ }
+
+ public void end()
+ {
+ byte flags = LAST_SEG;
+ SegmentEncoder enc = newEncoder(flags, method.getEncodedTrack(),
+ BODY, dataSize);
+ for (ByteBuffer buf : data)
+ {
+ enc.put(buf);
+ }
+ enc.flush();
+ data = null;
+ dataSize = 0;
}
protected void invoke(Method m)
{
- write(m);
+ method(m);
}
protected void invoke(Method m, Handler<Struct> handler)
diff --git a/java/common/src/main/java/org/apache/qpidity/AbstractMethod.java b/java/common/src/main/java/org/apache/qpidity/ChannelDelegate.java
index 258c33d715..d486868621 100644
--- a/java/common/src/main/java/org/apache/qpidity/AbstractMethod.java
+++ b/java/common/src/main/java/org/apache/qpidity/ChannelDelegate.java
@@ -20,26 +20,32 @@
*/
package org.apache.qpidity;
+import java.util.UUID;
+
/**
- * AbstractMethod
+ * ChannelDelegate
*
* @author Rafael H. Schloming
*/
-abstract class AbstractMethod extends AbstractStruct implements Method {
+class ChannelDelegate extends Delegate<Channel>
+{
- public byte getEncodedTrack()
+ public @Override void sessionOpen(Channel channel, SessionOpen open)
{
- // XXX
- return Frame.L2;
+ Session ssn = new Session();
+ ssn.attach(channel);
+ long lifetime = open.getDetachedLifetime();
+ System.out.println("Session Opened lifetime = " + lifetime);
+ ssn.sessionAttached(UUID.randomUUID(), lifetime);
}
- // XXX: do we need a segment base type?
- public byte getSegmentType()
+ public @Override void sessionAttached(Channel channel,
+ SessionAttached attached)
{
- // XXX
- return Frame.METHOD;
+ System.out.println("Session attached: " + attached.getSessionId() + ", " +
+ attached.getDetachedLifetime());
}
}
diff --git a/java/common/src/main/java/org/apache/qpidity/Connection.java b/java/common/src/main/java/org/apache/qpidity/Connection.java
index 7e31ca9b57..9171208a28 100644
--- a/java/common/src/main/java/org/apache/qpidity/Connection.java
+++ b/java/common/src/main/java/org/apache/qpidity/Connection.java
@@ -41,17 +41,27 @@ class Connection implements ProtocolActions
final private Handler<ByteBuffer> input;
final private Handler<ByteBuffer> output;
+ final private ConnectionDelegate delegate;
final private Map<Integer,Channel> channels = new HashMap<Integer,Channel>();
- private StructFactory factory;
-
+ // XXX: hardcoded versions
+ private ProtocolHeader header = new ProtocolHeader((byte) 1, (byte) 0, (byte) 10);
// XXX
private int maxFrame = 64*1024;
- public Connection(Handler<ByteBuffer> output)
+ public Connection(Handler<ByteBuffer> output,
+ ConnectionDelegate delegate,
+ InputHandler.State state)
{
- this.input = new InputHandler(this);
+ this.input = new InputHandler(this, state);
this.output = output;
+ this.delegate = delegate;
+ }
+
+ public Connection(Handler<ByteBuffer> output,
+ ConnectionDelegate delegate)
+ {
+ this(output, delegate, InputHandler.State.PROTO_HDR);
}
public Handler<ByteBuffer> getInputHandler()
@@ -64,9 +74,19 @@ class Connection implements ProtocolActions
return output;
}
- public StructFactory getFactory()
+ public ProtocolHeader getHeader()
+ {
+ return header;
+ }
+
+ public byte getMajor()
{
- return factory;
+ return header.getMajor();
+ }
+
+ public byte getMinor()
+ {
+ return header.getMinor();
}
public int getMaxFrame()
@@ -74,35 +94,31 @@ class Connection implements ProtocolActions
return maxFrame;
}
- public void init(ProtocolHeader header)
+ public void init(ProtocolHeader hdr)
{
System.out.println(header);
- // XXX: hardcoded versions
- if (header.getMajor() != 0 && header.getMinor() != 10)
+ if (hdr.getMajor() != header.getMajor() &&
+ hdr.getMinor() != header.getMinor())
{
- ByteBuffer buf = ByteBuffer.allocate(8);
- buf.put("AMQP".getBytes());
- buf.put((byte) 1);
- buf.put((byte) 1);
- buf.put((byte) 0);
- buf.put((byte) 10);
- buf.flip();
- output.handle(buf);
+ output.handle(header.toByteBuffer());
// XXX: how do we close the connection?
- } else {
- factory = new StructFactory_v0_10();
}
}
- public void frame(Frame frame)
+ public Channel getChannel(int number)
{
- Channel channel = channels.get(frame.getChannel());
+ Channel channel = channels.get(number);
if (channel == null)
{
- channel = new Channel(this, frame.getChannel());
- channels.put(frame.getChannel(), channel);
+ channel = new Channel(this, number, delegate.getSessionDelegate());
+ channels.put(number, channel);
}
+ return channel;
+ }
+ public void frame(Frame frame)
+ {
+ Channel channel = getChannel(frame.getChannel());
channel.handle(frame);
}
diff --git a/java/common/src/main/java/org/apache/qpidity/Writable.java b/java/common/src/main/java/org/apache/qpidity/ConnectionDelegate.java
index 9554da880a..9df264561c 100644
--- a/java/common/src/main/java/org/apache/qpidity/Writable.java
+++ b/java/common/src/main/java/org/apache/qpidity/ConnectionDelegate.java
@@ -20,15 +20,16 @@
*/
package org.apache.qpidity;
+
/**
- * Writable
+ * ConnectionDelegate
*
* @author Rafael H. Schloming
*/
-interface Writable
+public interface ConnectionDelegate
{
- void write(Encoder enc);
+ SessionDelegate getSessionDelegate();
}
diff --git a/java/common/src/main/java/org/apache/qpidity/ContentHandler.java b/java/common/src/main/java/org/apache/qpidity/ContentHandler.java
index 241d265bc4..435e377c2f 100644
--- a/java/common/src/main/java/org/apache/qpidity/ContentHandler.java
+++ b/java/common/src/main/java/org/apache/qpidity/ContentHandler.java
@@ -29,15 +29,17 @@ package org.apache.qpidity;
* @author Rafael H. Schloming
*/
-class ContentHandler<C> extends TypeSwitch<C>
+class ContentHandler extends TypeSwitch<Session>
{
- public ContentHandler(StructFactory factory, DelegateResolver<C> resolver)
+ public ContentHandler(byte major, byte minor, SessionDelegate delegate)
{
- MethodDispatcher<C> md = new MethodDispatcher<C>(factory, resolver);
- map(Frame.METHOD, new SegmentAssembler<C>(md));
- map(Frame.HEADER, new SegmentAssembler<C>(new HeaderHandler<C>()));
- map(Frame.BODY, new BodyHandler<C>());
+ MethodDispatcher<Session> md =
+ new MethodDispatcher<Session>(major, minor, delegate);
+ map(Frame.METHOD, new SegmentAssembler<Session>(md));
+ map(Frame.HEADER, new SegmentAssembler<Session>
+ (new HeaderHandler(major, minor, delegate)));
+ map(Frame.BODY, new BodyHandler(delegate));
}
}
diff --git a/java/common/src/main/java/org/apache/qpidity/DelegateResolver.java b/java/common/src/main/java/org/apache/qpidity/DelegateResolver.java
deleted file mode 100644
index 632781847e..0000000000
--- a/java/common/src/main/java/org/apache/qpidity/DelegateResolver.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpidity;
-
-
-/**
- * A DelegateResolver locates the desired delegate to use for handling
- * a given Struct.
- *
- * @author Rafael H. Schloming
- */
-
-interface DelegateResolver<C>
-{
-
- Delegate<C> resolve(Struct struct);
-
-}
diff --git a/java/common/src/main/java/org/apache/qpidity/AbstractStruct.java b/java/common/src/main/java/org/apache/qpidity/Encodable.java
index 6e70fe3db4..6a734a1791 100644
--- a/java/common/src/main/java/org/apache/qpidity/AbstractStruct.java
+++ b/java/common/src/main/java/org/apache/qpidity/Encodable.java
@@ -22,9 +22,16 @@ package org.apache.qpidity;
/**
- * AbstractStruct
+ * Encodable
*
* @author Rafael H. Schloming
*/
-public abstract class AbstractStruct implements Struct {}
+public interface Encodable
+{
+
+ void write(Encoder enc, byte major, byte minor);
+
+ void read(Decoder dec, byte major, byte minor);
+
+}
diff --git a/java/common/src/main/java/org/apache/qpidity/FragmentDecoder.java b/java/common/src/main/java/org/apache/qpidity/FragmentDecoder.java
index 64ee4ef6ce..b389d54390 100644
--- a/java/common/src/main/java/org/apache/qpidity/FragmentDecoder.java
+++ b/java/common/src/main/java/org/apache/qpidity/FragmentDecoder.java
@@ -40,23 +40,42 @@ class FragmentDecoder extends AbstractDecoder
private final Iterator<ByteBuffer> fragments;
private ByteBuffer current;
- public FragmentDecoder(StructFactory factory, Iterator<ByteBuffer> fragments)
+ public FragmentDecoder(byte major, byte minor, Iterator<ByteBuffer> fragments)
{
- super(factory);
+ super(major, minor);
this.fragments = fragments;
this.current = null;
}
- private void preRead()
+ public boolean hasRemaining()
{
- if (current == null)
+ advance();
+ return current != null || fragments.hasNext();
+ }
+
+ private void advance()
+ {
+ while (current == null && fragments.hasNext())
{
- if (!fragments.hasNext())
+ current = fragments.next();
+ if (current.hasRemaining())
+ {
+ break;
+ }
+ else
{
- throw new BufferUnderflowException();
+ current = null;
}
+ }
+ }
- current = fragments.next();
+ private void preRead()
+ {
+ advance();
+
+ if (current == null)
+ {
+ throw new BufferUnderflowException();
}
}
diff --git a/java/common/src/main/java/org/apache/qpidity/Frame.java b/java/common/src/main/java/org/apache/qpidity/Frame.java
index 1c8bb64fcf..d5076e0ef0 100644
--- a/java/common/src/main/java/org/apache/qpidity/Frame.java
+++ b/java/common/src/main/java/org/apache/qpidity/Frame.java
@@ -51,6 +51,8 @@ class Frame implements Iterable<ByteBuffer>
public static final byte RESERVED = 0x0;
+ public static final byte VERSION = 0x0;
+
public static final byte FIRST_SEG = 0x8;
public static final byte LAST_SEG = 0x4;
public static final byte FIRST_FRAME = 0x2;
diff --git a/java/common/src/main/java/org/apache/qpidity/Header.java b/java/common/src/main/java/org/apache/qpidity/Header.java
index 57e04cacdf..9b6373df19 100644
--- a/java/common/src/main/java/org/apache/qpidity/Header.java
+++ b/java/common/src/main/java/org/apache/qpidity/Header.java
@@ -27,4 +27,4 @@ package org.apache.qpidity;
* @author Rafael H. Schloming
*/
-public interface Header extends Struct {}
+public abstract class Header extends Struct {}
diff --git a/java/common/src/main/java/org/apache/qpidity/HeaderHandler.java b/java/common/src/main/java/org/apache/qpidity/HeaderHandler.java
index 8a407cd9f6..1855681756 100644
--- a/java/common/src/main/java/org/apache/qpidity/HeaderHandler.java
+++ b/java/common/src/main/java/org/apache/qpidity/HeaderHandler.java
@@ -20,6 +20,11 @@
*/
package org.apache.qpidity;
+import java.nio.ByteBuffer;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+
/**
* HeaderHandler
@@ -27,13 +32,33 @@ package org.apache.qpidity;
* @author Rafael H. Schloming
*/
-class HeaderHandler<C> implements Handler<Event<C,Segment>>
+class HeaderHandler implements Handler<Event<Session,Segment>>
{
- public void handle(Event<C,Segment> event)
+ private static final Struct[] EMPTY_STRUCT_ARRAY = {};
+
+ private final byte major;
+ private final byte minor;
+ private final SessionDelegate delegate;
+
+ public HeaderHandler(byte major, byte minor, SessionDelegate delegate)
+ {
+ this.major = major;
+ this.minor = minor;
+ this.delegate = delegate;
+ }
+
+ public void handle(Event<Session,Segment> event)
{
System.out.println("got header segment:\n " + event.target);
-
+ Iterator<ByteBuffer> fragments = event.target.getFragments();
+ FragmentDecoder dec = new FragmentDecoder(major, minor, fragments);
+ ArrayList<Struct> headers = new ArrayList();
+ while (dec.hasRemaining())
+ {
+ headers.add(dec.readLongStruct());
+ }
+ delegate.headers(event.context, headers.toArray(EMPTY_STRUCT_ARRAY));
}
}
diff --git a/java/common/src/main/java/org/apache/qpidity/InputHandler.java b/java/common/src/main/java/org/apache/qpidity/InputHandler.java
index 5cd4fe87c1..78cb3d1b60 100644
--- a/java/common/src/main/java/org/apache/qpidity/InputHandler.java
+++ b/java/common/src/main/java/org/apache/qpidity/InputHandler.java
@@ -34,7 +34,7 @@ import static org.apache.qpidity.InputHandler.State.*;
class InputHandler implements Handler<ByteBuffer>
{
- enum State
+ public enum State
{
PROTO_HDR,
PROTO_HDR_M,
diff --git a/java/common/src/main/java/org/apache/qpidity/Method.java b/java/common/src/main/java/org/apache/qpidity/Method.java
index ab8825743c..fb269dfb7b 100644
--- a/java/common/src/main/java/org/apache/qpidity/Method.java
+++ b/java/common/src/main/java/org/apache/qpidity/Method.java
@@ -27,11 +27,18 @@ package org.apache.qpidity;
* @author Rafael H. Schloming
*/
-interface Method extends Struct
+public abstract class Method extends Struct
{
- byte getEncodedTrack();
+ public abstract boolean hasPayload();
- byte getSegmentType();
+ public abstract byte getEncodedTrack();
+
+ // XXX: do we need a segment base type?
+ public byte getSegmentType()
+ {
+ // XXX
+ return Frame.METHOD;
+ }
}
diff --git a/java/common/src/main/java/org/apache/qpidity/MethodDispatcher.java b/java/common/src/main/java/org/apache/qpidity/MethodDispatcher.java
index b4545705ea..6c7389b02d 100644
--- a/java/common/src/main/java/org/apache/qpidity/MethodDispatcher.java
+++ b/java/common/src/main/java/org/apache/qpidity/MethodDispatcher.java
@@ -34,23 +34,29 @@ import java.util.Iterator;
class MethodDispatcher<C> implements Handler<Event<C,Segment>>
{
- final private StructFactory factory;
- final private DelegateResolver<C> resolver;
+ final private byte major;
+ final private byte minor;
+ final private Delegate<C> delegate;
+ // XXX: should be on session
+ private int count = 0;
- public MethodDispatcher(StructFactory factory, DelegateResolver<C> resolver)
+ public MethodDispatcher(byte major, byte minor, Delegate<C> delegate)
{
- this.factory = factory;
- this.resolver = resolver;
+ this.major = major;
+ this.minor = minor;
+ this.delegate = delegate;
}
public void handle(Event<C,Segment> event)
{
System.out.println("got method segment:\n " + event.target);
Iterator<ByteBuffer> fragments = event.target.getFragments();
- Decoder dec = new FragmentDecoder(factory, fragments);
+ Decoder dec = new FragmentDecoder(major, minor, fragments);
int type = (int) dec.readLong();
- Struct struct = factory.create(type, dec);
- Delegate<C> delegate = resolver.resolve(struct);
+ Struct struct = Struct.create(type);
+ struct.setId(count++);
+ struct.read(dec, major, minor);
+ System.out.println("delegating " + struct + "[" + struct.getId() + "] to " + delegate);
struct.delegate(event.context, delegate);
}
diff --git a/java/common/src/main/java/org/apache/qpidity/MethodHandler.java b/java/common/src/main/java/org/apache/qpidity/MethodHandler.java
index 8f43f4c06f..dd952fe2c2 100644
--- a/java/common/src/main/java/org/apache/qpidity/MethodHandler.java
+++ b/java/common/src/main/java/org/apache/qpidity/MethodHandler.java
@@ -32,9 +32,9 @@ package org.apache.qpidity;
class MethodHandler<C> extends TypeSwitch<C>
{
- public MethodHandler(StructFactory factory, DelegateResolver<C> resolver)
+ public MethodHandler(byte major, byte minor, Delegate<C> delegate)
{
- MethodDispatcher md = new MethodDispatcher<C>(factory, resolver);
+ MethodDispatcher md = new MethodDispatcher<C>(major, minor, delegate);
map(Frame.METHOD, new SegmentAssembler<C>(md));
}
diff --git a/java/common/src/main/java/org/apache/qpidity/MinaHandler.java b/java/common/src/main/java/org/apache/qpidity/MinaHandler.java
index b54f53dd80..17bbe5c0a7 100644
--- a/java/common/src/main/java/org/apache/qpidity/MinaHandler.java
+++ b/java/common/src/main/java/org/apache/qpidity/MinaHandler.java
@@ -22,6 +22,7 @@ package org.apache.qpidity;
import java.io.IOException;
import java.net.InetSocketAddress;
+import java.net.SocketAddress;
import org.apache.mina.common.ByteBuffer;
import org.apache.mina.common.ConnectFuture;
@@ -45,6 +46,15 @@ import org.apache.mina.transport.socket.nio.SocketConnector;
class MinaHandler implements IoHandler
{
+ private final ConnectionDelegate delegate;
+ private final InputHandler.State state;
+
+ public MinaHandler(ConnectionDelegate delegate, InputHandler.State state)
+ {
+ this.delegate = delegate;
+ this.state = state;
+ }
+
public void messageReceived(IoSession ssn, Object obj)
{
Connection conn = (Connection) ssn.getAttachment();
@@ -62,21 +72,29 @@ class MinaHandler implements IoHandler
e.printStackTrace();
}
- public void sessionCreated(IoSession ssn)
+ public void sessionCreated(final IoSession ssn)
{
System.out.println("created " + ssn);
}
public void sessionOpened(final IoSession ssn)
{
+ System.out.println("opened " + ssn);
Connection conn = new Connection(new Handler<java.nio.ByteBuffer>()
{
public void handle(java.nio.ByteBuffer buf)
{
ssn.write(ByteBuffer.wrap(buf));
}
- });
+ },
+ delegate,
+ state);
ssn.setAttachment(conn);
+ // XXX
+ synchronized (ssn)
+ {
+ ssn.notifyAll();
+ }
}
public void sessionClosed(IoSession ssn)
@@ -94,30 +112,48 @@ class MinaHandler implements IoHandler
{
ByteBuffer.setAllocator(new SimpleByteBufferAllocator());
if (args[0].equals("accept")) {
- accept(args);
+ accept("0.0.0.0", 5672, SessionDelegateStub.source());
} else if (args[0].equals("connect")) {
- connect(args);
+ connect("0.0.0.0", 5672, SessionDelegateStub.source());
}
}
- public static final void accept(String[] args) throws IOException
+ public static final void accept(String host, int port,
+ ConnectionDelegate delegate)
+ throws IOException
{
IoAcceptor acceptor = new SocketAcceptor();
- acceptor.bind(new InetSocketAddress("0.0.0.0", 5672), new MinaHandler());
+ acceptor.bind(new InetSocketAddress(host, port),
+ new MinaHandler(delegate, InputHandler.State.PROTO_HDR));
}
- public static final void connect(String[] args)
+ public static final Connection connect(String host, int port,
+ ConnectionDelegate delegate)
{
+ MinaHandler handler = new MinaHandler(delegate,
+ InputHandler.State.FRAME_HDR);
+ SocketAddress addr = new InetSocketAddress(host, port);
IoConnector connector = new SocketConnector();
- ConnectFuture cf = connector.connect(new InetSocketAddress("0.0.0.0", 5672), new MinaHandler());
+ ConnectFuture cf = connector.connect(addr, handler);
cf.join();
IoSession ssn = cf.getSession();
- ByteBuffer bb = ByteBuffer.allocate(1024);
- bb.put("AMQP".getBytes());
- bb.flip();
- for (int i = 0; i < 10; i++) {
- ssn.write(bb);
+ // XXX
+ synchronized (ssn)
+ {
+ while (ssn.getAttachment() == null)
+ {
+ try
+ {
+ ssn.wait();
+ }
+ catch (InterruptedException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
}
+ Connection conn = (Connection) ssn.getAttachment();
+ return conn;
}
}
diff --git a/java/common/src/main/java/org/apache/qpidity/ProtocolHeader.java b/java/common/src/main/java/org/apache/qpidity/ProtocolHeader.java
index 1f12e285e4..5e397ae574 100644
--- a/java/common/src/main/java/org/apache/qpidity/ProtocolHeader.java
+++ b/java/common/src/main/java/org/apache/qpidity/ProtocolHeader.java
@@ -20,6 +20,8 @@
*/
package org.apache.qpidity;
+import java.nio.ByteBuffer;
+
/**
* ProtocolHeader
@@ -30,6 +32,8 @@ package org.apache.qpidity;
class ProtocolHeader
{
+ private static final byte[] AMQP = {'A', 'M', 'Q', 'P' };
+
final private byte instance;
final private byte major;
final private byte minor;
@@ -56,6 +60,18 @@ class ProtocolHeader
return minor;
}
+ public ByteBuffer toByteBuffer()
+ {
+ ByteBuffer buf = ByteBuffer.allocate(8);
+ buf.put(AMQP);
+ buf.put((byte) 1);
+ buf.put((byte) 1);
+ buf.put((byte) 0);
+ buf.put((byte) 10);
+ buf.flip();
+ return buf;
+ }
+
public String toString()
{
return String.format("AMQP.%d %d-%d", instance, major, minor);
diff --git a/java/common/src/main/java/org/apache/qpidity/SegmentEncoder.java b/java/common/src/main/java/org/apache/qpidity/SegmentEncoder.java
index 7d4e5aabe4..25e29bdcf3 100644
--- a/java/common/src/main/java/org/apache/qpidity/SegmentEncoder.java
+++ b/java/common/src/main/java/org/apache/qpidity/SegmentEncoder.java
@@ -49,13 +49,11 @@ class SegmentEncoder extends AbstractEncoder
private ByteBuffer frame;
private boolean first;
- public SegmentEncoder(Handler<ByteBuffer> handler, int max,
- byte flags,
- byte track,
- byte type,
- int channel,
- int remaining)
+ public SegmentEncoder(byte major, byte minor, Handler<ByteBuffer> handler,
+ int max, byte flags, byte track, byte type,
+ int channel, int remaining)
{
+ super(major, minor);
if (max < HEADER_SIZE + 1)
{
throw new IllegalArgumentException
@@ -115,7 +113,7 @@ class SegmentEncoder extends AbstractEncoder
}
}
- @Override protected void put(byte b)
+ @Override public void put(byte b)
{
preWrite();
frame.put(b);
@@ -123,7 +121,7 @@ class SegmentEncoder extends AbstractEncoder
postWrite();
}
- @Override protected void put(ByteBuffer src)
+ @Override public void put(ByteBuffer src)
{
if (src.remaining() > remaining)
{
@@ -147,7 +145,8 @@ class SegmentEncoder extends AbstractEncoder
buf.put("AMQP_PROTOCOL_HEADER".getBytes());
buf.flip();
- SegmentEncoder enc = new SegmentEncoder(new Handler<ByteBuffer>()
+ SegmentEncoder enc = new SegmentEncoder((byte) 0, (byte) 10,
+ new Handler<ByteBuffer>()
{
public void handle(ByteBuffer frame)
{
diff --git a/java/common/src/main/java/org/apache/qpidity/Session.java b/java/common/src/main/java/org/apache/qpidity/Session.java
index 666d6463c0..b3116570e8 100644
--- a/java/common/src/main/java/org/apache/qpidity/Session.java
+++ b/java/common/src/main/java/org/apache/qpidity/Session.java
@@ -20,6 +20,8 @@
*/
package org.apache.qpidity;
+import java.nio.ByteBuffer;
+
import java.util.HashMap;
import java.util.Map;
@@ -35,80 +37,88 @@ public class Session extends Invoker
// channel may be null
Channel channel;
- private int command_id = 0;
- // XXX
- final Map<Integer,Handler<Struct>> handlers = new HashMap<Integer,Handler<Struct>>();
-
- public void attach(Channel channel)
+ // outgoing command count
+ private long commandsOut = 0;
+ // XXX: incoming command count not used
+ // incoming command count
+ private long commandsIn = 0;
+ private Map<Long,Method> commands = new HashMap<Long,Method>();
+ private long mark = 0;
+
+ public long getCommandsOut()
{
- this.channel = channel;
- channel.setSession(this);
+ return commandsOut;
}
- protected void invoke(Method m)
+ public long getCommandsIn()
{
- command_id++;
- channel.write(m);
+ return commandsIn;
}
- protected void invoke(Method m, Handler<Struct> handler)
+ public void attach(Channel channel)
{
- invoke(m);
- handlers.put(command_id, handler);
+ this.channel = channel;
+ channel.setSession(this);
}
- protected StructFactory getFactory()
+ public Method getCommand(long id)
{
- return channel.getFactory();
+ System.out.println(id + " " + commands);
+ return commands.get(id);
}
- // -----------------------------------------
- // Messaging Methods
- // ------------------------------------------
- public void messageTransfer(String destination, Message msg, Option ... _options) throws QpidException
+ void complete(long lower, long upper)
{
-
+ for (long id = lower; id <= upper; id++)
+ {
+ commands.put(id, null);
+ }
}
- public void data(byte[] src) throws QpidException
+ void complete(long mark)
{
- // TODO Auto-generated method stub
+ complete(this.mark, mark);
+ this.mark = mark;
}
- public void endData() throws QpidException
+ protected void invoke(Method m)
{
- // TODO Auto-generated method stub
+ if (m.getEncodedTrack() == Frame.L4)
+ {
+ long cmd = commandsOut++;
+ commands.put(cmd, m);
+ }
+ channel.method(m);
}
- public void messageHeaders(Header... headers) throws QpidException
+ public void headers(Struct ... headers)
{
- // TODO Auto-generated method stub
+ channel.headers(headers);
}
- public void messageTransfer(String destination,Option... options) throws QpidException
+ public void data(ByteBuffer buf)
{
- // TODO Auto-generated method stub
+ channel.data(buf);
}
- public void messageAcknowledge() throws QpidException
+ public void data(String str)
{
- // TODO Auto-generated method stub
+ channel.data(str);
}
- public boolean messageAcquire() throws QpidException
+ public void data(byte[] bytes)
{
- // TODO Auto-generated method stub
- return false;
+ channel.data(bytes);
}
- public void messageReject() throws QpidException
+ public void end()
{
- // TODO Auto-generated method stub
+ channel.end();
}
- public void messageRelease() throws QpidException
+ protected void invoke(Method m, Handler<Struct> handler)
{
- // TODO Auto-generated method stub
- }
+ throw new UnsupportedOperationException();
+ }
}
diff --git a/java/common/src/main/java/org/apache/qpidity/SimpleDelegateResolver.java b/java/common/src/main/java/org/apache/qpidity/SessionDelegate.java
index fe52f0c90b..c2fbe6ba00 100644
--- a/java/common/src/main/java/org/apache/qpidity/SimpleDelegateResolver.java
+++ b/java/common/src/main/java/org/apache/qpidity/SessionDelegate.java
@@ -22,24 +22,29 @@ package org.apache.qpidity;
/**
- * SimpleDelegateResolver
+ * SessionDelegate
*
* @author Rafael H. Schloming
*/
-class SimpleDelegateResolver<C> implements DelegateResolver<C>
+public abstract class SessionDelegate extends Delegate<Session>
{
- private final Delegate<C> delegate;
+ public abstract void headers(Session ssn, Struct ... headers);
- public SimpleDelegateResolver(Delegate<C> delegate)
- {
- this.delegate = delegate;
- }
+ public abstract void data(Session ssn, Frame frame);
- public Delegate<C> resolve(Struct struct)
+ @Override public void executionComplete(Session ssn, ExecutionComplete excmp)
{
- return delegate;
+ Range<Long>[] ranges = excmp.getRangedExecutionSet();
+ if (ranges != null)
+ {
+ for (Range<Long> range : ranges)
+ {
+ ssn.complete(range.getLower(), range.getUpper());
+ }
+ }
+ ssn.complete(excmp.getCumulativeExecutionMark());
}
}
diff --git a/java/common/src/main/java/org/apache/qpidity/SizeEncoder.java b/java/common/src/main/java/org/apache/qpidity/SizeEncoder.java
index b6ecdd2818..141c501104 100644
--- a/java/common/src/main/java/org/apache/qpidity/SizeEncoder.java
+++ b/java/common/src/main/java/org/apache/qpidity/SizeEncoder.java
@@ -36,11 +36,12 @@ class SizeEncoder extends AbstractEncoder
private int size;
- public SizeEncoder() {
- this(0);
+ public SizeEncoder(byte major, byte minor) {
+ this(major, minor, 0);
}
- public SizeEncoder(int size) {
+ public SizeEncoder(byte major, byte minor, int size) {
+ super(major, minor);
this.size = size;
}
@@ -64,6 +65,7 @@ class SizeEncoder extends AbstractEncoder
@Override public void writeShortstr(String s)
{
+ if (s == null) { s = ""; }
if (s.length() > 255) {
throw new IllegalArgumentException(s);
}
@@ -73,6 +75,7 @@ class SizeEncoder extends AbstractEncoder
@Override public void writeLongstr(String s)
{
+ if (s == null) { s = ""; }
writeLong(s.length());
size += s.length();
}
diff --git a/java/common/src/main/java/org/apache/qpidity/Struct.java b/java/common/src/main/java/org/apache/qpidity/Struct.java
index 63b0db2c12..16b02a72a3 100644
--- a/java/common/src/main/java/org/apache/qpidity/Struct.java
+++ b/java/common/src/main/java/org/apache/qpidity/Struct.java
@@ -27,9 +27,27 @@ package org.apache.qpidity;
* @author Rafael H. Schloming
*/
-interface Struct extends Delegator, Writable
+public abstract class Struct implements Delegator, Encodable
{
- int getEncodedType();
+ public static Struct create(int type)
+ {
+ return StructFactory.create(type);
+ }
+
+ // XXX: command subclass?
+ private long id;
+
+ public final long getId()
+ {
+ return id;
+ }
+
+ void setId(long id)
+ {
+ this.id = id;
+ }
+
+ abstract int getEncodedType();
}
diff --git a/java/common/src/main/java/org/apache/qpidity/Stub.java b/java/common/src/main/java/org/apache/qpidity/Stub.java
index e4ac8358f3..94d5932d20 100644
--- a/java/common/src/main/java/org/apache/qpidity/Stub.java
+++ b/java/common/src/main/java/org/apache/qpidity/Stub.java
@@ -10,11 +10,15 @@ import static org.apache.qpidity.Option.*;
public class Stub {
- private static Connection conn = new Connection(new ConsoleOutput());
+ private static final byte major = 0;
+ private static final byte minor = 10;
+
+ private static Connection conn = new Connection(new ConsoleOutput(),
+ SessionDelegateStub.source());
static
{
- conn.init(new ProtocolHeader((byte) 1, (byte) 0, (byte) 10));
+ conn.init(new ProtocolHeader((byte) 1, major, minor));
}
private static void frame(byte track, byte type, boolean first, boolean last) {
@@ -22,21 +26,21 @@ public class Stub {
}
private static void frame(byte track, byte type, boolean first, boolean last, Method m) {
- SizeEncoder sizer = new SizeEncoder();
+ SizeEncoder sizer = new SizeEncoder(major, minor);
if (m != null) {
sizer.writeLong(m.getEncodedType());
- m.write(sizer);
+ m.write(sizer, major, minor);
sizer.flush();
}
ByteBuffer buf = ByteBuffer.allocate(sizer.getSize());
if (m != null) {
- Encoder enc = new BBEncoder(buf);
+ Encoder enc = new BBEncoder(major, minor, buf);
enc.writeLong(m.getEncodedType());
- m.write(enc);
+ m.write(enc, major, minor);
enc.flush();
}
buf.flip();
- byte flags = 0;
+ byte flags = Frame.VERSION;
if (first) { flags |= Frame.FIRST_FRAME; }
if (last) { flags |= Frame.LAST_FRAME; }
Frame frame = new Frame(flags, type, track, 0);
@@ -45,13 +49,12 @@ public class Stub {
}
public static final void main(String[] args) {
- StructFactory f = new StructFactory_v0_10();
- frame(Frame.L2, Frame.METHOD, true, true, f.newSessionOpen(0));
+ frame(Frame.L2, Frame.METHOD, true, true, new SessionOpen(0));
frame(Frame.L4, Frame.METHOD, true, false,
- f.newQueueDeclare("asdf", "alternate", null, DURABLE));
+ new QueueDeclare("asdf", "alternate", null, DURABLE));
frame(Frame.L4, Frame.METHOD, false, false);
frame(Frame.L3, Frame.METHOD, true, true,
- f.newExchangeDeclare("exchange", "type", "alternate", null));
+ new ExchangeDeclare("exchange", "type", "alternate", null));
frame(Frame.L4, Frame.METHOD, false, true);
frame(Frame.L4, Frame.HEADER, true, false);
frame(Frame.L4, Frame.HEADER, false, false);
@@ -60,59 +63,43 @@ public class Stub {
frame(Frame.L4, Frame.BODY, false, false);
frame(Frame.L4, Frame.BODY, false, false);
frame(Frame.L1, Frame.METHOD, true, true,
- f.newExchangeDeclare("exchange", "type", "alternate", null));
+ new ExchangeDeclare("exchange", "type", "alternate", null));
frame(Frame.L4, Frame.BODY, false, false);
frame(Frame.L4, Frame.BODY, false, true);
}
}
-//====: Channel and Session Delegates :=======================================//
-
-class ChannelDelegate extends Delegate<Channel> {
+class SessionDelegateStub extends SessionDelegate {
- public @Override void sessionOpen(Channel channel, SessionOpen open) {
- Session ssn = new Session();
- ssn.attach(channel);
- long lifetime = open.getDetachedLifetime();
- System.out.println("Session Opened lifetime = " + lifetime);
- try
- {
- ssn.sessionAttached(UUID.randomUUID(), lifetime);
- }
- catch (QpidException e)
+ public static final ConnectionDelegate source()
+ {
+ return new ConnectionDelegate()
{
- throw new RuntimeException(e);
- }
+ public SessionDelegate getSessionDelegate()
+ {
+ return new SessionDelegateStub();
+ }
+ };
}
-}
-
-class SessionDelegate extends Delegate<Session> {
-
public @Override void queueDeclare(Session session, QueueDeclare qd) {
System.out.println("got a queue declare: " + qd.getQueue());
}
public @Override void exchangeDeclare(Session session, ExchangeDeclare ed) {
System.out.println("got an exchange declare: " + ed.getExchange() + ", " + ed.getType());
- try
- {
- session.queueDeclare("asdf", "alternate", null);
- }
- catch(QpidException e)
- {
- throw new RuntimeException(e);
- }
+ session.queueDeclare("asdf", "alternate", null);
}
- /*
- public @Override void executionResult(Session session, ExecutionResult result) {
- Handler<Struct> handler = session.handlers.get(result.getCommandId());
- if (handler != null) {
- handler.handle(result.getData());
- }
- }
- */
+ public void data(Session ssn, Frame frame)
+ {
+ System.out.println("got data: " + frame);
+ }
+
+ public void headers(Session ssn, Struct ... headers)
+ {
+ System.out.println("got headers: " + headers);
+ }
}
diff --git a/java/common/src/main/java/org/apache/qpidity/ToyBroker.java b/java/common/src/main/java/org/apache/qpidity/ToyBroker.java
new file mode 100644
index 0000000000..534f075dbd
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpidity/ToyBroker.java
@@ -0,0 +1,195 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpidity;
+
+import java.io.IOException;
+
+import java.nio.ByteBuffer;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+
+import static org.apache.qpidity.Functions.*;
+
+
+/**
+ * ToyBroker
+ *
+ * @author Rafael H. Schloming
+ */
+
+class ToyBroker extends SessionDelegate
+{
+
+ private Map<String,Queue<Message>> queues;
+ private MessageTransfer xfr = null;
+ private DeliveryProperties props = null;
+ private Struct[] headers = null;
+ private List<Frame> frames = null;
+
+ public ToyBroker(Map<String,Queue<Message>> queues)
+ {
+ this.queues = queues;
+ }
+
+ @Override public void queueDeclare(Session ssn, QueueDeclare qd)
+ {
+ queues.put(qd.getQueue(), new LinkedList());
+ System.out.println("declared queue: " + qd.getQueue());
+ }
+
+ @Override public void messageTransfer(Session ssn, MessageTransfer xfr)
+ {
+ this.xfr = xfr;
+ frames = new ArrayList();
+ }
+
+ public void headers(Session ssn, Struct ... headers)
+ {
+ if (xfr == null || frames == null)
+ {
+ ssn.connectionClose(503, "no method segment", 0, 0);
+ // XXX: close at our end
+ return;
+ }
+
+ for (Struct hdr : headers)
+ {
+ if (hdr instanceof DeliveryProperties)
+ {
+ props = (DeliveryProperties) hdr;
+ }
+ }
+
+ if (props != null && !props.getDiscardUnroutable())
+ {
+ String dest = xfr.getDestination();
+ if (!queues.containsKey(dest))
+ {
+ reject(ssn);
+ }
+ }
+
+ this.headers = headers;
+ }
+
+ public void data(Session ssn, Frame frame)
+ {
+ if (xfr == null || frames == null)
+ {
+ ssn.connectionClose(503, "no method segment", 0, 0);
+ // XXX: close at our end
+ return;
+ }
+
+ frames.add(frame);
+
+ if (frame.isLastSegment() && frame.isLastFrame())
+ {
+ String dest = xfr.getDestination();
+ Queue queue = queues.get(dest);
+ if (queue == null)
+ {
+ reject(ssn);
+ }
+ else
+ {
+ Message m = new Message(headers, frames);
+ queue.offer(m);
+ System.out.println("queued " + m);
+ }
+ xfr = null;
+ frames = null;
+ }
+ }
+
+ private void reject(Session ssn)
+ {
+ if (props != null && props.getDiscardUnroutable())
+ {
+ return;
+ }
+ else
+ {
+ long id = xfr.getId();
+ Range[] ranges = {new Range<Long>(id, id)};
+ ssn.messageReject(ranges, 0, "no such destination");
+ }
+ }
+
+ private class Message
+ {
+ private final Struct[] headers;
+ private final List<Frame> frames;
+
+ public Message(Struct[] headers, List<Frame> frames)
+ {
+ this.headers = headers;
+ this.frames = frames;
+ }
+
+ public String toString()
+ {
+ StringBuilder sb = new StringBuilder();
+
+ if (headers != null)
+ {
+ boolean first = true;
+ for (Struct hdr : headers)
+ {
+ if (first) { first = false; }
+ else { sb.append(" "); }
+ sb.append(hdr);
+ }
+ }
+
+ for (Frame f : frames)
+ {
+ for (ByteBuffer b : f)
+ {
+ sb.append(" | ");
+ sb.append(str(b));
+ }
+ }
+
+ return sb.toString();
+ }
+
+ }
+
+ public static final void main(String[] args) throws IOException
+ {
+ final Map<String,Queue<Message>> queues =
+ new HashMap<String,Queue<Message>>();
+ MinaHandler.accept("0.0.0.0", 5672, new ConnectionDelegate()
+ {
+ public SessionDelegate getSessionDelegate()
+ {
+ return new ToyBroker(queues);
+ }
+ });
+ }
+
+}
diff --git a/java/common/src/main/java/org/apache/qpidity/ToyClient.java b/java/common/src/main/java/org/apache/qpidity/ToyClient.java
new file mode 100644
index 0000000000..1e57de3265
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpidity/ToyClient.java
@@ -0,0 +1,88 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpidity;
+
+
+/**
+ * ToyClient
+ *
+ * @author Rafael H. Schloming
+ */
+
+class ToyClient extends SessionDelegate
+{
+
+ @Override public void messageReject(Session ssn, MessageReject reject)
+ {
+ for (Range<Long> range : reject.getTransfers())
+ {
+ for (long l = range.getLower(); l <= range.getUpper(); l++)
+ {
+ System.out.println("message rejected: " +
+ ssn.getCommand((int) l));
+ }
+ }
+ }
+
+ public void headers(Session ssn, Struct ... headers)
+ {
+ for (Struct hdr : headers)
+ {
+ System.out.println("header: " + hdr);
+ }
+ }
+
+ public void data(Session ssn, Frame frame)
+ {
+ System.out.println("got data: " + frame);
+ }
+
+ public static final void main(String[] args)
+ {
+ Connection conn = MinaHandler.connect("0.0.0.0", 5672,
+ new ConnectionDelegate()
+ {
+ public SessionDelegate getSessionDelegate()
+ {
+ return new ToyClient();
+ }
+ });
+ conn.getOutputHandler().handle(conn.getHeader().toByteBuffer());
+
+ Channel ch = conn.getChannel(0);
+ Session ssn = new Session();
+ ssn.attach(ch);
+ ssn.sessionOpen(1234);
+
+ ssn.queueDeclare("asdf", null, null);
+
+ ssn.messageTransfer("asdf", (short) 0, (short) 1);
+ ssn.headers(new DeliveryProperties(),
+ new MessageProperties());
+ ssn.data("this is the data");
+ ssn.end();
+
+ ssn.messageTransfer("fdsa", (short) 0, (short) 1);
+ ssn.data("this should be rejected");
+ ssn.end();
+ }
+
+}