summaryrefslogtreecommitdiff
path: root/java/common
diff options
context:
space:
mode:
authorRafael H. Schloming <rhs@apache.org>2007-08-01 05:28:01 +0000
committerRafael H. Schloming <rhs@apache.org>2007-08-01 05:28:01 +0000
commit7c16ea45a0193861868f27a53403995d57e49e66 (patch)
tree33c9d6d2814818d07a50ed28cc384293a8de5fe5 /java/common
parent92de1f9135b949a9b75f818190ad1f010e336d78 (diff)
downloadqpid-python-7c16ea45a0193861868f27a53403995d57e49e66.tar.gz
plumbed the input/output into MINA and filled out more of the encoding/decoding
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@561665 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/common')
-rw-r--r--java/common/src/main/java/org/apache/qpidity/AbstractDecoder.java149
-rw-r--r--java/common/src/main/java/org/apache/qpidity/AbstractEncoder.java163
-rw-r--r--java/common/src/main/java/org/apache/qpidity/AbstractMethod.java18
-rw-r--r--java/common/src/main/java/org/apache/qpidity/BBDecoder.java85
-rw-r--r--java/common/src/main/java/org/apache/qpidity/BBEncoder.java73
-rw-r--r--java/common/src/main/java/org/apache/qpidity/Channel.java54
-rw-r--r--java/common/src/main/java/org/apache/qpidity/CommonSessionDelegate.java64
-rw-r--r--java/common/src/main/java/org/apache/qpidity/Connection.java69
-rw-r--r--java/common/src/main/java/org/apache/qpidity/ConsoleOutput.java42
-rw-r--r--java/common/src/main/java/org/apache/qpidity/ContentHandler.java7
-rw-r--r--java/common/src/main/java/org/apache/qpidity/Encoder.java2
-rw-r--r--java/common/src/main/java/org/apache/qpidity/FragmentDecoder.java91
-rw-r--r--java/common/src/main/java/org/apache/qpidity/Frame.java138
-rw-r--r--java/common/src/main/java/org/apache/qpidity/Functions.java80
-rw-r--r--java/common/src/main/java/org/apache/qpidity/InputHandler.java230
-rw-r--r--java/common/src/main/java/org/apache/qpidity/Method.java4
-rw-r--r--java/common/src/main/java/org/apache/qpidity/MethodDispatcher.java25
-rw-r--r--java/common/src/main/java/org/apache/qpidity/MethodHandler.java7
-rw-r--r--java/common/src/main/java/org/apache/qpidity/MinaHandler.java123
-rw-r--r--java/common/src/main/java/org/apache/qpidity/ProtocolActions.java39
-rw-r--r--java/common/src/main/java/org/apache/qpidity/ProtocolError.java47
-rw-r--r--java/common/src/main/java/org/apache/qpidity/ProtocolHandler.java39
-rw-r--r--java/common/src/main/java/org/apache/qpidity/ProtocolHeader.java64
-rw-r--r--java/common/src/main/java/org/apache/qpidity/Segment.java54
-rw-r--r--java/common/src/main/java/org/apache/qpidity/SegmentAssembler.java6
-rw-r--r--java/common/src/main/java/org/apache/qpidity/SegmentEncoder.java173
-rw-r--r--java/common/src/main/java/org/apache/qpidity/Session.java8
-rw-r--r--java/common/src/main/java/org/apache/qpidity/SimpleDelegateResolver.java45
-rw-r--r--java/common/src/main/java/org/apache/qpidity/SizeEncoder.java60
-rw-r--r--java/common/src/main/java/org/apache/qpidity/SliceIterator.java59
-rw-r--r--java/common/src/main/java/org/apache/qpidity/Stub.java50
-rw-r--r--java/common/src/main/java/org/apache/qpidity/Switch.java6
-rw-r--r--java/common/src/main/java/org/apache/qpidity/TrackSwitch.java4
-rw-r--r--java/common/src/main/java/org/apache/qpidity/TypeSwitch.java4
34 files changed, 1714 insertions, 368 deletions
diff --git a/java/common/src/main/java/org/apache/qpidity/AbstractDecoder.java b/java/common/src/main/java/org/apache/qpidity/AbstractDecoder.java
new file mode 100644
index 0000000000..42da09f2d9
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpidity/AbstractDecoder.java
@@ -0,0 +1,149 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpidity;
+
+import java.util.Map;
+import java.util.UUID;
+
+import static org.apache.qpidity.Functions.*;
+
+
+/**
+ * AbstractDecoder
+ *
+ * @author Rafael H. Schloming
+ */
+
+abstract class AbstractDecoder implements Decoder
+{
+
+ protected abstract byte get();
+
+ protected abstract void get(byte[] bytes);
+
+ private byte bits = 0x0;
+ private byte nbits = 0;
+
+ public boolean readBit()
+ {
+ if (nbits == 0)
+ {
+ bits = get();
+ nbits = 8;
+ }
+
+ nbits -= 1;
+
+ boolean result = ((bits >>> nbits) & 0x01) != 0;
+ return result;
+ }
+
+ private void clearBits()
+ {
+ bits = 0x0;
+ nbits = 0;
+ }
+
+ public short readOctet()
+ {
+ clearBits();
+ return unsigned(get());
+ }
+
+ public int readShort()
+ {
+ clearBits();
+ int i = get() << 8;
+ i |= get();
+ return i;
+ }
+
+ public long readLong()
+ {
+ clearBits();
+ long l = get() << 24;
+ l |= get() << 16;
+ l |= get() << 8;
+ l |= get();
+ return l;
+ }
+
+ public long readLonglong()
+ {
+ clearBits();
+ long l = get() << 56;
+ l |= get() << 48;
+ l |= get() << 40;
+ l |= get() << 32;
+ l |= get() << 24;
+ l |= get() << 16;
+ l |= get() << 8;
+ l |= get();
+ return l;
+ }
+
+ public long readTimestamp()
+ {
+ return readLonglong();
+ }
+
+
+ public String readShortstr()
+ {
+ short size = readOctet();
+ byte[] bytes = new byte[size];
+ get(bytes);
+ return new String(bytes);
+ }
+
+ public String readLongstr()
+ {
+ long size = readLong();
+ assert size <= Integer.MAX_VALUE;
+ byte[] bytes = new byte[(int) size];
+ get(bytes);
+ return new String(bytes);
+ }
+
+ public Map<String,?> readTable()
+ {
+ //throw new Error("TODO");
+ return null;
+ }
+
+ public Range<Long>[] readRfc1982LongSet()
+ {
+ throw new Error("TODO");
+ }
+
+ public UUID readUuid()
+ {
+ long msb = readLong();
+ long lsb = readLong();
+ return new UUID(msb, lsb);
+ }
+
+ public String readContent()
+ {
+ throw new Error("Deprecated");
+ }
+
+}
diff --git a/java/common/src/main/java/org/apache/qpidity/AbstractEncoder.java b/java/common/src/main/java/org/apache/qpidity/AbstractEncoder.java
new file mode 100644
index 0000000000..5ca68fe564
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpidity/AbstractEncoder.java
@@ -0,0 +1,163 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpidity;
+
+import java.nio.ByteBuffer;
+
+import java.util.Map;
+import java.util.UUID;
+
+import static org.apache.qpidity.Functions.*;
+
+
+/**
+ * AbstractEncoder
+ *
+ * @author Rafael H. Schloming
+ */
+
+abstract class AbstractEncoder implements Encoder
+{
+
+ protected abstract void put(byte b);
+
+ protected abstract void put(ByteBuffer src);
+
+ private byte bits = 0x0;
+ private byte nbits = 0;
+
+ public void writeBit(boolean b)
+ {
+ if (b)
+ {
+ bits |= 1 << nbits;
+ }
+
+ nbits += 1;
+
+ if (nbits == 8)
+ {
+ flushBits();
+ }
+ }
+
+ private void flushBits()
+ {
+ if (nbits > 0)
+ {
+ put(bits);
+ bits = 0x0;
+ nbits = 0;
+ }
+ }
+
+ public void writeOctet(short b)
+ {
+ assert b < 0x100;
+
+ flushBits();
+ put((byte) b);
+ }
+
+ public void writeShort(int s)
+ {
+ assert s < 0x10000;
+
+ flushBits();
+ put(lsb(s >> 8));
+ put(lsb(s));
+ }
+
+ public void writeLong(long i)
+ {
+ assert i < 0x100000000L;
+
+ flushBits();
+ put(lsb(i >> 24));
+ put(lsb(i >> 16));
+ put(lsb(i >> 8));
+ put(lsb(i));
+ }
+
+ public void writeLonglong(long l)
+ {
+ flushBits();
+ put(lsb(l >> 56));
+ put(lsb(l >> 48));
+ put(lsb(l >> 40));
+ put(lsb(l >> 32));
+ put(lsb(l >> 24));
+ put(lsb(l >> 16));
+ put(lsb(l >> 8));
+ put(lsb(l));
+ }
+
+
+ public void writeTimestamp(long l)
+ {
+ flushBits();
+ writeLonglong(l);
+ }
+
+
+ public void writeShortstr(String s)
+ {
+ if (s.length() > 255) {
+ throw new IllegalArgumentException(s);
+ }
+ writeOctet((short) s.length());
+ put(ByteBuffer.wrap(s.getBytes()));
+ }
+
+ public void writeLongstr(String s)
+ {
+ writeLong(s.length());
+ put(ByteBuffer.wrap(s.getBytes()));
+ }
+
+
+ public void writeTable(Map<String,?> table)
+ {
+ //throw new Error("TODO");
+ }
+
+ public void writeRfc1982LongSet(Range<Long>[] ranges)
+ {
+ throw new Error("TODO");
+ }
+
+ public void writeUuid(UUID uuid)
+ {
+ writeLong(uuid.getMostSignificantBits());
+ writeLong(uuid.getLeastSignificantBits());
+ }
+
+ public void writeContent(String c)
+ {
+ throw new Error("Deprecated");
+ }
+
+ public void flush()
+ {
+ flushBits();
+ }
+
+}
diff --git a/java/common/src/main/java/org/apache/qpidity/AbstractMethod.java b/java/common/src/main/java/org/apache/qpidity/AbstractMethod.java
index 7db1aeea43..258c33d715 100644
--- a/java/common/src/main/java/org/apache/qpidity/AbstractMethod.java
+++ b/java/common/src/main/java/org/apache/qpidity/AbstractMethod.java
@@ -20,7 +20,6 @@
*/
package org.apache.qpidity;
-import java.nio.ByteBuffer;
/**
* AbstractMethod
@@ -28,4 +27,19 @@ import java.nio.ByteBuffer;
* @author Rafael H. Schloming
*/
-abstract class AbstractMethod extends AbstractStruct implements Method {}
+abstract class AbstractMethod extends AbstractStruct implements Method {
+
+ public byte getEncodedTrack()
+ {
+ // XXX
+ return Frame.L2;
+ }
+
+ // XXX: do we need a segment base type?
+ public byte getSegmentType()
+ {
+ // XXX
+ return Frame.METHOD;
+ }
+
+}
diff --git a/java/common/src/main/java/org/apache/qpidity/BBDecoder.java b/java/common/src/main/java/org/apache/qpidity/BBDecoder.java
index 9c5da34cd1..08ac03113f 100644
--- a/java/common/src/main/java/org/apache/qpidity/BBDecoder.java
+++ b/java/common/src/main/java/org/apache/qpidity/BBDecoder.java
@@ -22,8 +22,6 @@ package org.apache.qpidity;
import java.nio.ByteBuffer;
-import java.util.Map;
-import java.util.UUID;
/**
* BBDecoder
@@ -31,24 +29,9 @@ import java.util.UUID;
* @author Rafael H. Schloming
*/
-class BBDecoder implements Decoder
+class BBDecoder extends AbstractDecoder
{
- private static final short unsigned(byte b)
- {
- return (short) ((0x100 + b) & 0xFF);
- }
-
- private static final int unsigned(short s)
- {
- return (0x10000 + s) & 0xFFFF;
- }
-
- private static final long unsigned(int i)
- {
- return (0x1000000000L + i) & 0xFFFFFFFFL;
- }
-
private final ByteBuffer in;
public BBDecoder(ByteBuffer in)
@@ -56,74 +39,14 @@ class BBDecoder implements Decoder
this.in = in;
}
- public boolean readBit()
- {
- //throw new Error("TODO");
- return false;
- }
-
- public short readOctet()
- {
- return unsigned(in.get());
- }
-
- public int readShort()
- {
- return unsigned(in.getShort());
- }
-
- public long readLong()
- {
- return unsigned(in.getInt());
- }
-
- public long readLonglong()
+ protected byte get()
{
- throw new Error("TODO");
+ return in.get();
}
- public long readTimestamp()
+ protected void get(byte[] bytes)
{
- throw new Error("TODO");
- }
-
-
- public String readShortstr()
- {
- short size = readOctet();
- byte[] bytes = new byte[size];
- in.get(bytes);
- return new String(bytes);
- }
-
- public String readLongstr()
- {
- long size = readLong();
- assert size <= Integer.MAX_VALUE;
- byte[] bytes = new byte[(int) size];
in.get(bytes);
- return new String(bytes);
- }
-
- public Map<String,?> readTable()
- {
- //throw new Error("TODO");
- return null;
- }
-
- public Range<Long>[] readRfc1982LongSet()
- {
- throw new Error("TODO");
- }
-
- public UUID readUuid()
- {
- throw new Error("TODO");
- }
-
- public String readContent()
- {
- throw new Error("TODO");
}
}
diff --git a/java/common/src/main/java/org/apache/qpidity/BBEncoder.java b/java/common/src/main/java/org/apache/qpidity/BBEncoder.java
index 50b8659fd7..947a461ab3 100644
--- a/java/common/src/main/java/org/apache/qpidity/BBEncoder.java
+++ b/java/common/src/main/java/org/apache/qpidity/BBEncoder.java
@@ -22,8 +22,6 @@ package org.apache.qpidity;
import java.nio.ByteBuffer;
-import java.util.Map;
-import java.util.UUID;
/**
* BBEncoder
@@ -31,7 +29,7 @@ import java.util.UUID;
* @author Rafael H. Schloming
*/
-class BBEncoder implements Encoder
+class BBEncoder extends AbstractEncoder
{
private final ByteBuffer out;
@@ -40,75 +38,14 @@ class BBEncoder implements Encoder
this.out = out;
}
- public void writeBit(boolean b)
+ @Override protected void put(byte b)
{
- //throw new Error("TODO");
+ out.put(b);
}
- public void writeOctet(short b)
+ @Override protected void put(ByteBuffer src)
{
- assert b < 0x100;
- out.put((byte) b);
- }
-
- public void writeShort(int s)
- {
- assert s < 0x10000;
- out.putShort((short) s);
- }
-
- public void writeLong(long i)
- {
- assert i < 0x100000000L;
- out.putInt((int) i);
- }
-
- public void writeLonglong(long l)
- {
- throw new Error("TODO");
- }
-
-
- public void writeTimestamp(long l)
- {
- throw new Error("TODO");
- }
-
-
- public void writeShortstr(String s)
- {
- if (s.length() > 255) {
- throw new IllegalArgumentException(s);
- }
- writeOctet((short) s.length());
- out.put(s.getBytes());
- }
-
- public void writeLongstr(String s)
- {
- writeLong(s.length());
- out.put(s.getBytes());
- }
-
-
- public void writeTable(Map<String,?> table)
- {
- //throw new Error("TODO");
- }
-
- public void writeRfc1982LongSet(Range<Long>[] ranges)
- {
- throw new Error("TODO");
- }
-
- public void writeUuid(UUID uuid)
- {
- throw new Error("TODO");
- }
-
- public void writeContent(String c)
- {
- throw new Error("TODO");
+ out.put(src);
}
}
diff --git a/java/common/src/main/java/org/apache/qpidity/Channel.java b/java/common/src/main/java/org/apache/qpidity/Channel.java
index a9060622e8..4f23112da3 100644
--- a/java/common/src/main/java/org/apache/qpidity/Channel.java
+++ b/java/common/src/main/java/org/apache/qpidity/Channel.java
@@ -20,32 +20,43 @@
*/
package org.apache.qpidity;
-
/**
* Channel
*
* @author Rafael H. Schloming
*/
-class Channel extends Invoker
- implements Handler<Frame>, DelegateResolver<Channel>
+class Channel extends Invoker implements Handler<Frame>
{
final private Connection connection;
+ final private int channel;
final private TrackSwitch<Channel> tracks;
-
// session may be null
private Session session;
- public Channel(Connection connection)
+ public Channel(Connection connection, int channel)
{
this.connection = connection;
+ this.channel = channel;
+
+ DelegateResolver<Channel> chDR =
+ new SimpleDelegateResolver<Channel>(new ChannelDelegate());
+ DelegateResolver<Session> ssnDR =
+ new SimpleDelegateResolver<Session>(new SessionDelegate());
+
tracks = new TrackSwitch<Channel>();
- tracks.map(Frame.L1, new MethodHandler<Channel>());
- tracks.map(Frame.L2, new MethodHandler<Channel>());
- tracks.map(Frame.L3, new SessionResolver<Frame>(new MethodHandler<Session>()));
- tracks.map(Frame.L4, new SessionResolver<Frame>(new ContentHandler<Session>()));
+ tracks.map(Frame.L1, new MethodHandler<Channel>(getFactory(), chDR));
+ tracks.map(Frame.L2, new MethodHandler<Channel>(getFactory(), chDR));
+ tracks.map(Frame.L3, new SessionResolver<Frame>
+ (new MethodHandler<Session>(getFactory(), ssnDR)));
+ tracks.map(Frame.L4, new SessionResolver<Frame>
+ (new ContentHandler<Session>(getFactory(), ssnDR)));
+ }
+
+ public int getEncodedChannel() {
+ return channel;
}
public Session getSession()
@@ -63,9 +74,25 @@ class Channel extends Invoker
tracks.handle(new Event<Channel,Frame>(this, frame));
}
- public void write(Writable writable)
+ public void write(Method m)
{
- System.out.println("writing: " + writable);
+ SizeEncoder sizer = new SizeEncoder();
+ sizer.writeLong(m.getEncodedType());
+ m.write(sizer);
+ sizer.flush();
+ int size = sizer.getSize();
+
+ // XXX: need to set header flags properly
+ SegmentEncoder enc = new SegmentEncoder(connection.getOutputHandler(),
+ connection.getMaxFrame(),
+ (byte) 0x0,
+ m.getEncodedTrack(),
+ m.getSegmentType(),
+ channel,
+ size);
+ enc.writeLong(m.getEncodedType());
+ m.write(enc);
+ enc.flush();
}
protected StructFactory getFactory()
@@ -73,11 +100,6 @@ class Channel extends Invoker
return connection.getFactory();
}
- public Delegate<Channel> resolve(Struct struct)
- {
- return new ChannelDelegate();
- }
-
protected void invoke(Method m)
{
write(m);
diff --git a/java/common/src/main/java/org/apache/qpidity/CommonSessionDelegate.java b/java/common/src/main/java/org/apache/qpidity/CommonSessionDelegate.java
index b8efa857f6..e9173a5c5b 100644
--- a/java/common/src/main/java/org/apache/qpidity/CommonSessionDelegate.java
+++ b/java/common/src/main/java/org/apache/qpidity/CommonSessionDelegate.java
@@ -1,27 +1,51 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
package org.apache.qpidity;
+
+/**
+ * CommonSessionDelegate
+ */
+
public class CommonSessionDelegate extends Delegate<Session>
{
- public void sessionAttached(Session session, SessionAttached struct) {}
-
- public void sessionFlow(Session session, SessionFlow struct) {}
-
- public void sessionFlowOk(Session session, SessionFlowOk struct) {}
-
- public void sessionClose(Session session, SessionClose struct) {}
-
- public void sessionClosed(Session session, SessionClosed struct) {}
-
- public void sessionResume(Session session, SessionResume struct) {}
-
- public void sessionPing(Session session, SessionPing struct) {}
-
- public void sessionPong(Session session, SessionPong struct) {}
-
- public void sessionSuspend(Session session, SessionSuspend struct) {}
-
- public void sessionDetached(Session session, SessionDetached struct) {}
-
+ @Override public void sessionAttached(Session session, SessionAttached struct) {}
+
+ @Override public void sessionFlow(Session session, SessionFlow struct) {}
+
+ @Override public void sessionFlowOk(Session session, SessionFlowOk struct) {}
+
+ @Override public void sessionClose(Session session, SessionClose struct) {}
+
+ @Override public void sessionClosed(Session session, SessionClosed struct) {}
+
+ @Override public void sessionResume(Session session, SessionResume struct) {}
+
+ @Override public void sessionPing(Session session, SessionPing struct) {}
+
+ @Override public void sessionPong(Session session, SessionPong struct) {}
+
+ @Override public void sessionSuspend(Session session, SessionSuspend struct) {}
+
+ @Override public void sessionDetached(Session session, SessionDetached struct) {}
}
diff --git a/java/common/src/main/java/org/apache/qpidity/Connection.java b/java/common/src/main/java/org/apache/qpidity/Connection.java
index bb8b945051..7e31ca9b57 100644
--- a/java/common/src/main/java/org/apache/qpidity/Connection.java
+++ b/java/common/src/main/java/org/apache/qpidity/Connection.java
@@ -23,6 +23,9 @@ package org.apache.qpidity;
import java.util.HashMap;
import java.util.Map;
+import java.nio.ByteBuffer;
+
+
/**
* Connection
*
@@ -33,27 +36,79 @@ import java.util.Map;
* short instead of Short
*/
-class Connection implements Handler<Frame>
+class Connection implements ProtocolActions
{
- final private Map<Short,Channel> channels = new HashMap<Short,Channel>();
- final private StructFactory factory = new StructFactory_v0_10();
+ final private Handler<ByteBuffer> input;
+ final private Handler<ByteBuffer> output;
+
+ final private Map<Integer,Channel> channels = new HashMap<Integer,Channel>();
+ private StructFactory factory;
+
+ // XXX
+ private int maxFrame = 64*1024;
+
+ public Connection(Handler<ByteBuffer> output)
+ {
+ this.input = new InputHandler(this);
+ this.output = output;
+ }
+
+ public Handler<ByteBuffer> getInputHandler()
+ {
+ return input;
+ }
+
+ public Handler<ByteBuffer> getOutputHandler()
+ {
+ return output;
+ }
+
+ public StructFactory getFactory()
+ {
+ return factory;
+ }
+
+ public int getMaxFrame()
+ {
+ return maxFrame;
+ }
- public void handle(Frame frame)
+ public void init(ProtocolHeader header)
+ {
+ System.out.println(header);
+ // XXX: hardcoded versions
+ if (header.getMajor() != 0 && header.getMinor() != 10)
+ {
+ ByteBuffer buf = ByteBuffer.allocate(8);
+ buf.put("AMQP".getBytes());
+ buf.put((byte) 1);
+ buf.put((byte) 1);
+ buf.put((byte) 0);
+ buf.put((byte) 10);
+ buf.flip();
+ output.handle(buf);
+ // XXX: how do we close the connection?
+ } else {
+ factory = new StructFactory_v0_10();
+ }
+ }
+
+ public void frame(Frame frame)
{
Channel channel = channels.get(frame.getChannel());
if (channel == null)
{
- channel = new Channel(this);
+ channel = new Channel(this, frame.getChannel());
channels.put(frame.getChannel(), channel);
}
channel.handle(frame);
}
- public StructFactory getFactory()
+ public void error(ProtocolError error)
{
- return factory;
+ throw new RuntimeException(error.getMessage());
}
}
diff --git a/java/common/src/main/java/org/apache/qpidity/ConsoleOutput.java b/java/common/src/main/java/org/apache/qpidity/ConsoleOutput.java
new file mode 100644
index 0000000000..fa35e9332f
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpidity/ConsoleOutput.java
@@ -0,0 +1,42 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpidity;
+
+import java.nio.ByteBuffer;
+
+import static org.apache.qpidity.Functions.*;
+
+
+/**
+ * ConsoleOutput
+ *
+ * @author Rafael H. Schloming
+ */
+
+class ConsoleOutput implements Handler<ByteBuffer>
+{
+
+ public void handle(ByteBuffer buf)
+ {
+ System.out.println(str(buf));
+ }
+
+}
diff --git a/java/common/src/main/java/org/apache/qpidity/ContentHandler.java b/java/common/src/main/java/org/apache/qpidity/ContentHandler.java
index ea73a3574d..241d265bc4 100644
--- a/java/common/src/main/java/org/apache/qpidity/ContentHandler.java
+++ b/java/common/src/main/java/org/apache/qpidity/ContentHandler.java
@@ -29,12 +29,13 @@ package org.apache.qpidity;
* @author Rafael H. Schloming
*/
-class ContentHandler<C extends DelegateResolver<C>> extends TypeSwitch<C>
+class ContentHandler<C> extends TypeSwitch<C>
{
- public ContentHandler()
+ public ContentHandler(StructFactory factory, DelegateResolver<C> resolver)
{
- map(Frame.METHOD, new SegmentAssembler<C>(new MethodDispatcher<C>()));
+ MethodDispatcher<C> md = new MethodDispatcher<C>(factory, resolver);
+ map(Frame.METHOD, new SegmentAssembler<C>(md));
map(Frame.HEADER, new SegmentAssembler<C>(new HeaderHandler<C>()));
map(Frame.BODY, new BodyHandler<C>());
}
diff --git a/java/common/src/main/java/org/apache/qpidity/Encoder.java b/java/common/src/main/java/org/apache/qpidity/Encoder.java
index 716df97397..954b690dc2 100644
--- a/java/common/src/main/java/org/apache/qpidity/Encoder.java
+++ b/java/common/src/main/java/org/apache/qpidity/Encoder.java
@@ -49,4 +49,6 @@ public interface Encoder
void writeContent(String c);
+ void flush();
+
}
diff --git a/java/common/src/main/java/org/apache/qpidity/FragmentDecoder.java b/java/common/src/main/java/org/apache/qpidity/FragmentDecoder.java
new file mode 100644
index 0000000000..dcf1c19c38
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpidity/FragmentDecoder.java
@@ -0,0 +1,91 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpidity;
+
+import java.nio.BufferUnderflowException;
+import java.nio.ByteBuffer;
+
+import java.util.Iterator;
+
+import static java.lang.Math.*;
+
+
+/**
+ * FragmentDecoder
+ *
+ * @author Rafael H. Schloming
+ */
+
+class FragmentDecoder extends AbstractDecoder
+{
+
+ private final Iterator<ByteBuffer> fragments;
+ private ByteBuffer current;
+
+ public FragmentDecoder(Iterator<ByteBuffer> fragments)
+ {
+ this.fragments = fragments;
+ this.current = null;
+ }
+
+ private void preRead()
+ {
+ if (current == null)
+ {
+ if (!fragments.hasNext())
+ {
+ throw new BufferUnderflowException();
+ }
+
+ current = fragments.next();
+ }
+ }
+
+ private void postRead()
+ {
+ if (current.remaining() == 0)
+ {
+ current = null;
+ }
+ }
+
+ @Override protected byte get()
+ {
+ preRead();
+ byte b = current.get();
+ postRead();
+ return b;
+ }
+
+ @Override protected void get(byte[] bytes)
+ {
+ int remaining = bytes.length;
+ while (remaining > 0)
+ {
+ preRead();
+ int size = min(remaining, current.remaining());
+ current.get(bytes, 0, size);
+ remaining -= size;
+ postRead();
+ }
+ }
+
+}
diff --git a/java/common/src/main/java/org/apache/qpidity/Frame.java b/java/common/src/main/java/org/apache/qpidity/Frame.java
index d1bebd8365..1c8bb64fcf 100644
--- a/java/common/src/main/java/org/apache/qpidity/Frame.java
+++ b/java/common/src/main/java/org/apache/qpidity/Frame.java
@@ -21,7 +21,13 @@
package org.apache.qpidity;
import java.nio.ByteBuffer;
-import java.nio.ShortBuffer;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Iterator;
+
+import static org.apache.qpidity.Functions.*;
+
/**
* Frame
@@ -29,103 +35,127 @@ import java.nio.ShortBuffer;
* @author Rafael H. Schloming
*/
-class Frame
+class Frame implements Iterable<ByteBuffer>
{
+ public static final int HEADER_SIZE = 12;
+
+ // XXX: enums?
+ public static final byte L1 = 0;
+ public static final byte L2 = 1;
+ public static final byte L3 = 2;
+ public static final byte L4 = 3;
+
+ public static final byte METHOD = 1;
+ public static final byte HEADER = 2;
+ public static final byte BODY = 3;
+
+ public static final byte RESERVED = 0x0;
+
+ public static final byte FIRST_SEG = 0x8;
+ public static final byte LAST_SEG = 0x4;
+ public static final byte FIRST_FRAME = 0x2;
+ public static final byte LAST_FRAME = 0x1;
- public static final short L1 = 0;
- public static final short L2 = 1;
- public static final short L3 = 2;
- public static final short L4 = 3;
-
- public static final short METHOD = 1;
- public static final short HEADER = 2;
- public static final short BODY = 3;
-
- final private short channel;
- final private short track;
- final private short type;
- final private boolean firstSegment;
- final private boolean lastSegment;
- final private boolean firstFrame;
- final private boolean lastFrame;
- final private ByteBuffer payload;
-
- // XXX
- final private int sequence = 0;
-
- public Frame(short channel, short track, short type, boolean firstSegment,
- boolean lastSegment, boolean firstFrame, boolean lastFrame,
- ByteBuffer payload)
+ final private byte flags;
+ final private byte type;
+ final private byte track;
+ final private int channel;
+ final private List<ByteBuffer> fragments;
+ private int size;
+
+ public Frame(byte flags, byte type, byte track, int channel)
{
- this.channel = channel;
- this.track = track;
+ this.flags = flags;
this.type = type;
- this.firstSegment = firstSegment;
- this.lastSegment = lastSegment;
- this.firstFrame = firstFrame;
- this.lastFrame = lastFrame;
- this.payload = payload;
+ this.track = track;
+ this.channel = channel;
+ this.size = 0;
+ this.fragments = new ArrayList<ByteBuffer>();
}
- public short getChannel()
+ public void addFragment(ByteBuffer fragment)
+ {
+ fragments.add(fragment);
+ size += fragment.remaining();
+ }
+
+ public int getChannel()
{
return channel;
}
- public short getTrack()
+ public int getSize()
{
- return track;
+ return size;
}
- public short getType()
+ public byte getType()
{
return type;
}
+ public byte getTrack()
+ {
+ return track;
+ }
+
+ private boolean flag(byte mask)
+ {
+ return (flags & mask) != 0;
+ }
+
public boolean isFirstSegment()
{
- return firstSegment;
+ return flag(FIRST_SEG);
}
public boolean isLastSegment()
{
- return lastSegment;
+ return flag(LAST_SEG);
}
public boolean isFirstFrame()
{
- return firstFrame;
+ return flag(FIRST_FRAME);
}
public boolean isLastFrame()
{
- return lastFrame;
+ return flag(LAST_FRAME);
}
- public ByteBuffer getPayload()
+ public Iterator<ByteBuffer> getFragments()
{
- return payload.slice();
+ return new SliceIterator(fragments.iterator());
}
- public int getSize()
+ public Iterator<ByteBuffer> iterator()
{
- return payload.remaining();
+ return getFragments();
}
public String toString()
{
StringBuilder str = new StringBuilder();
str.append(String.format
- ("[%05d %05d %1d %1d %d%d%d%d]", channel, getSize(), track, type,
- firstSegment ? 1 : 0, lastSegment ? 1 : 0,
- firstFrame ? 1 : 0, lastFrame ? 1 : 0));
- ShortBuffer shorts = payload.asShortBuffer();
- for (int i = 0; i < shorts.limit(); i++) {
- str.append(String.format(" %04x", shorts.get(i)));
- if (str.length() > 70) {
- str.append(" ...");
- break;
+ ("[%05d %05d %1d %1d %d%d%d%d]", getChannel(), getSize(),
+ getTrack(), getType(),
+ isFirstSegment() ? 1 : 0, isLastSegment() ? 1 : 0,
+ isFirstFrame() ? 1 : 0, isLastFrame() ? 1 : 0));
+
+ boolean first = true;
+ for (ByteBuffer buf : this)
+ {
+ if (first)
+ {
+ first = false;
+ }
+ else
+ {
+ str.append(" | ");
}
+
+ str.append(str(buf));
}
return str.toString();
diff --git a/java/common/src/main/java/org/apache/qpidity/Functions.java b/java/common/src/main/java/org/apache/qpidity/Functions.java
new file mode 100644
index 0000000000..f5217bf038
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpidity/Functions.java
@@ -0,0 +1,80 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpidity;
+
+import java.nio.ByteBuffer;
+
+
+/**
+ * Functions
+ *
+ * @author Rafael H. Schloming
+ */
+
+class Functions
+{
+
+ public static final short unsigned(byte b)
+ {
+ return (short) ((0x100 + b) & 0xFF);
+ }
+
+ public static final int unsigned(short s)
+ {
+ return (0x10000 + s) & 0xFFFF;
+ }
+
+ public static final long unsigned(int i)
+ {
+ return (0x1000000000L + i) & 0xFFFFFFFFL;
+ }
+
+ public static final byte lsb(int i)
+ {
+ return (byte) (0xFF & i);
+ }
+
+ public static final byte lsb(long l)
+ {
+ return (byte) (0xFF & l);
+ }
+
+ public static final String str(ByteBuffer buf)
+ {
+ return str(buf, buf.limit());
+ }
+
+ public static final String str(ByteBuffer buf, int limit)
+ {
+ StringBuilder str = new StringBuilder();
+ for (int i = 0; i < limit; i++)
+ {
+ if (i > 0 && i % 2 == 0)
+ {
+ str.append(" ");
+ }
+ str.append(String.format("%02x", buf.get(i)));
+ }
+
+ return str.toString();
+ }
+
+}
diff --git a/java/common/src/main/java/org/apache/qpidity/InputHandler.java b/java/common/src/main/java/org/apache/qpidity/InputHandler.java
new file mode 100644
index 0000000000..5cd4fe87c1
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpidity/InputHandler.java
@@ -0,0 +1,230 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpidity;
+
+import java.nio.ByteBuffer;
+
+import static org.apache.qpidity.InputHandler.State.*;
+
+
+/**
+ * InputHandler
+ *
+ * @author Rafael H. Schloming
+ */
+
+class InputHandler implements Handler<ByteBuffer>
+{
+
+ enum State
+ {
+ PROTO_HDR,
+ PROTO_HDR_M,
+ PROTO_HDR_Q,
+ PROTO_HDR_P,
+ PROTO_HDR_CLASS,
+ PROTO_HDR_INSTANCE,
+ PROTO_HDR_MAJOR,
+ PROTO_HDR_MINOR,
+ FRAME_HDR,
+ FRAME_HDR_TYPE,
+ FRAME_HDR_SIZE1,
+ FRAME_HDR_SIZE2,
+ FRAME_HDR_RSVD1,
+ FRAME_HDR_TRACK,
+ FRAME_HDR_CH1,
+ FRAME_HDR_CH2,
+ FRAME_HDR_RSVD2,
+ FRAME_HDR_RSVD3,
+ FRAME_HDR_RSVD4,
+ FRAME_HDR_RSVD5,
+ FRAME_PAYLOAD,
+ FRAME_FRAGMENT,
+ ERROR;
+ }
+
+ private final ProtocolActions actions;
+ private State state;
+
+ private byte instance;
+ private byte major;
+ private byte minor;
+
+ private byte flags;
+ private byte type;
+ private byte track;
+ private int channel;
+ private int size;
+ private Frame frame;
+
+ public InputHandler(ProtocolActions actions, State state)
+ {
+ this.actions = actions;
+ this.state = state;
+ }
+
+ public InputHandler(ProtocolActions actions)
+ {
+ this(actions, PROTO_HDR);
+ }
+
+ private void init()
+ {
+ actions.init(new ProtocolHeader(instance, major, minor));
+ }
+
+ private void frame()
+ {
+ assert size == frame.getSize();
+ actions.frame(frame);
+ frame = null;
+ }
+
+ private void error(String fmt, Object ... args)
+ {
+ actions.error(new ProtocolError(fmt, args));
+ }
+
+ public void handle(ByteBuffer buf)
+ {
+ while (buf.hasRemaining())
+ {
+ state = next(buf);
+ }
+ }
+
+ private State next(ByteBuffer buf)
+ {
+ switch (state) {
+ case PROTO_HDR:
+ return expect(buf, 'A', PROTO_HDR_M);
+ case PROTO_HDR_M:
+ return expect(buf, 'M', PROTO_HDR_Q);
+ case PROTO_HDR_Q:
+ return expect(buf, 'Q', PROTO_HDR_P);
+ case PROTO_HDR_P:
+ return expect(buf, 'P', PROTO_HDR_CLASS);
+ case PROTO_HDR_CLASS:
+ return expect(buf, 1, PROTO_HDR_INSTANCE);
+ case PROTO_HDR_INSTANCE:
+ instance = buf.get();
+ return PROTO_HDR_MAJOR;
+ case PROTO_HDR_MAJOR:
+ major = buf.get();
+ return PROTO_HDR_MINOR;
+ case PROTO_HDR_MINOR:
+ minor = buf.get();
+ init();
+ return FRAME_HDR;
+ case FRAME_HDR:
+ flags = buf.get();
+ return FRAME_HDR_TYPE;
+ case FRAME_HDR_TYPE:
+ type = buf.get();
+ return FRAME_HDR_SIZE1;
+ case FRAME_HDR_SIZE1:
+ size = buf.get() << 8;
+ return FRAME_HDR_SIZE2;
+ case FRAME_HDR_SIZE2:
+ size += buf.get();
+ size -= 12;
+ return FRAME_HDR_RSVD1;
+ case FRAME_HDR_RSVD1:
+ return expect(buf, 0, FRAME_HDR_TRACK);
+ case FRAME_HDR_TRACK:
+ byte b = buf.get();
+ if ((b & 0xF0) != 0) {
+ error("non-zero reserved bits in upper nibble of " +
+ "frame header byte 5: '%x'", b);
+ return ERROR;
+ } else {
+ track = (byte) (b & 0xF);
+ return FRAME_HDR_CH1;
+ }
+ case FRAME_HDR_CH1:
+ channel = buf.get() << 8;
+ return FRAME_HDR_CH2;
+ case FRAME_HDR_CH2:
+ channel += buf.get();
+ return FRAME_HDR_RSVD2;
+ case FRAME_HDR_RSVD2:
+ return expect(buf, 0, FRAME_HDR_RSVD3);
+ case FRAME_HDR_RSVD3:
+ return expect(buf, 0, FRAME_HDR_RSVD4);
+ case FRAME_HDR_RSVD4:
+ return expect(buf, 0, FRAME_HDR_RSVD5);
+ case FRAME_HDR_RSVD5:
+ return expect(buf, 0, FRAME_PAYLOAD);
+ case FRAME_PAYLOAD:
+ frame = new Frame(flags, type, track, channel);
+ if (size > buf.remaining()) {
+ frame.addFragment(buf.slice());
+ buf.position(buf.limit());
+ return FRAME_FRAGMENT;
+ } else {
+ ByteBuffer payload = buf.slice();
+ payload.limit(size);
+ buf.position(buf.position() + size);
+ frame.addFragment(payload);
+ frame();
+ return FRAME_HDR;
+ }
+ case FRAME_FRAGMENT:
+ int delta = size - frame.getSize();
+ if (delta > buf.remaining()) {
+ frame.addFragment(buf.slice());
+ buf.position(buf.limit());
+ return FRAME_FRAGMENT;
+ } else {
+ ByteBuffer fragment = buf.slice();
+ fragment.limit(delta);
+ buf.position(buf.position() + delta);
+ frame.addFragment(fragment);
+ frame();
+ return FRAME_HDR;
+ }
+ default:
+ throw new IllegalStateException();
+ }
+ }
+
+ private State expect(ByteBuffer buf, int expected, State next)
+ {
+ return expect(buf, (byte) expected, next);
+ }
+
+ private State expect(ByteBuffer buf, char expected, State next)
+ {
+ return expect(buf, (byte) expected, next);
+ }
+
+ private State expect(ByteBuffer buf, byte expected, State next)
+ {
+ byte b = buf.get();
+ if (b == expected) {
+ return next;
+ } else {
+ error("expecting '%x', got '%x'", expected, b);
+ return ERROR;
+ }
+ }
+
+}
diff --git a/java/common/src/main/java/org/apache/qpidity/Method.java b/java/common/src/main/java/org/apache/qpidity/Method.java
index 75e3bdfbaa..a3926b572e 100644
--- a/java/common/src/main/java/org/apache/qpidity/Method.java
+++ b/java/common/src/main/java/org/apache/qpidity/Method.java
@@ -31,4 +31,8 @@ interface Method extends Struct {
int getEncodedType();
+ byte getEncodedTrack();
+
+ byte getSegmentType();
+
}
diff --git a/java/common/src/main/java/org/apache/qpidity/MethodDispatcher.java b/java/common/src/main/java/org/apache/qpidity/MethodDispatcher.java
index a2a4ebb0fc..8fbc6f547d 100644
--- a/java/common/src/main/java/org/apache/qpidity/MethodDispatcher.java
+++ b/java/common/src/main/java/org/apache/qpidity/MethodDispatcher.java
@@ -22,26 +22,35 @@ package org.apache.qpidity;
import java.nio.ByteBuffer;
+import java.util.Iterator;
+
+
/**
* A MethodDispatcher parses and dispatches a method segment.
*
* @author Rafael H. Schloming
*/
-class MethodDispatcher<C extends DelegateResolver<C>>
- implements Handler<Event<C,Segment>>
+class MethodDispatcher<C> implements Handler<Event<C,Segment>>
{
- // XXX: should be passed in
- final private StructFactory factory = new StructFactory_v0_10();
+ final private StructFactory factory;
+ final private DelegateResolver<C> resolver;
+
+ public MethodDispatcher(StructFactory factory, DelegateResolver<C> resolver)
+ {
+ this.factory = factory;
+ this.resolver = resolver;
+ }
public void handle(Event<C,Segment> event)
{
System.out.println("got method segment:\n " + event.target);
- ByteBuffer bb = event.target.getPayload();
- int type = bb.getInt();
- Struct struct = factory.create(type, new BBDecoder(bb));
- Delegate<C> delegate = event.context.resolve(struct);
+ Iterator<ByteBuffer> fragments = event.target.getFragments();
+ Decoder dec = new FragmentDecoder(fragments);
+ int type = (int) dec.readLong();
+ Struct struct = factory.create(type, dec);
+ Delegate<C> delegate = resolver.resolve(struct);
struct.delegate(event.context, delegate);
}
diff --git a/java/common/src/main/java/org/apache/qpidity/MethodHandler.java b/java/common/src/main/java/org/apache/qpidity/MethodHandler.java
index ebc7471622..8f43f4c06f 100644
--- a/java/common/src/main/java/org/apache/qpidity/MethodHandler.java
+++ b/java/common/src/main/java/org/apache/qpidity/MethodHandler.java
@@ -29,12 +29,13 @@ package org.apache.qpidity;
* @author Rafael H. Schloming
*/
-class MethodHandler<C extends DelegateResolver<C>> extends TypeSwitch<C>
+class MethodHandler<C> extends TypeSwitch<C>
{
- public MethodHandler()
+ public MethodHandler(StructFactory factory, DelegateResolver<C> resolver)
{
- map(Frame.METHOD, new SegmentAssembler<C>(new MethodDispatcher<C>()));
+ MethodDispatcher md = new MethodDispatcher<C>(factory, resolver);
+ map(Frame.METHOD, new SegmentAssembler<C>(md));
}
}
diff --git a/java/common/src/main/java/org/apache/qpidity/MinaHandler.java b/java/common/src/main/java/org/apache/qpidity/MinaHandler.java
new file mode 100644
index 0000000000..b54f53dd80
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpidity/MinaHandler.java
@@ -0,0 +1,123 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpidity;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import org.apache.mina.common.ByteBuffer;
+import org.apache.mina.common.ConnectFuture;
+import org.apache.mina.common.IdleStatus;
+import org.apache.mina.common.IoAcceptor;
+import org.apache.mina.common.IoConnector;
+import org.apache.mina.common.IoHandler;
+import org.apache.mina.common.IoSession;
+import org.apache.mina.common.SimpleByteBufferAllocator;
+
+import org.apache.mina.transport.socket.nio.SocketAcceptor;
+import org.apache.mina.transport.socket.nio.SocketConnector;
+
+
+/**
+ * MinaHandler
+ *
+ * @author Rafael H. Schloming
+ */
+
+class MinaHandler implements IoHandler
+{
+
+ public void messageReceived(IoSession ssn, Object obj)
+ {
+ Connection conn = (Connection) ssn.getAttachment();
+ ByteBuffer buf = (ByteBuffer) obj;
+ conn.getInputHandler().handle(buf.buf());
+ }
+
+ public void messageSent(IoSession ssn, Object obj)
+ {
+ System.out.println("TX: " + obj);
+ }
+
+ public void exceptionCaught(IoSession ssn, Throwable e)
+ {
+ e.printStackTrace();
+ }
+
+ public void sessionCreated(IoSession ssn)
+ {
+ System.out.println("created " + ssn);
+ }
+
+ public void sessionOpened(final IoSession ssn)
+ {
+ Connection conn = new Connection(new Handler<java.nio.ByteBuffer>()
+ {
+ public void handle(java.nio.ByteBuffer buf)
+ {
+ ssn.write(ByteBuffer.wrap(buf));
+ }
+ });
+ ssn.setAttachment(conn);
+ }
+
+ public void sessionClosed(IoSession ssn)
+ {
+ System.out.println("closed " + ssn);
+ ssn.setAttachment(null);
+ }
+
+ public void sessionIdle(IoSession ssn, IdleStatus status)
+ {
+ System.out.println(status);
+ }
+
+ public static final void main(String[] args) throws IOException
+ {
+ ByteBuffer.setAllocator(new SimpleByteBufferAllocator());
+ if (args[0].equals("accept")) {
+ accept(args);
+ } else if (args[0].equals("connect")) {
+ connect(args);
+ }
+ }
+
+ public static final void accept(String[] args) throws IOException
+ {
+ IoAcceptor acceptor = new SocketAcceptor();
+ acceptor.bind(new InetSocketAddress("0.0.0.0", 5672), new MinaHandler());
+ }
+
+ public static final void connect(String[] args)
+ {
+ IoConnector connector = new SocketConnector();
+ ConnectFuture cf = connector.connect(new InetSocketAddress("0.0.0.0", 5672), new MinaHandler());
+ cf.join();
+ IoSession ssn = cf.getSession();
+ ByteBuffer bb = ByteBuffer.allocate(1024);
+ bb.put("AMQP".getBytes());
+ bb.flip();
+ for (int i = 0; i < 10; i++) {
+ ssn.write(bb);
+ }
+ }
+
+}
diff --git a/java/common/src/main/java/org/apache/qpidity/ProtocolActions.java b/java/common/src/main/java/org/apache/qpidity/ProtocolActions.java
new file mode 100644
index 0000000000..c658320989
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpidity/ProtocolActions.java
@@ -0,0 +1,39 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpidity;
+
+
+/**
+ * ProtocolActions
+ *
+ * @author Rafael H. Schloming
+ */
+
+interface ProtocolActions
+{
+
+ void init(ProtocolHeader header);
+
+ void frame(Frame frame);
+
+ void error(ProtocolError error);
+
+}
diff --git a/java/common/src/main/java/org/apache/qpidity/ProtocolError.java b/java/common/src/main/java/org/apache/qpidity/ProtocolError.java
new file mode 100644
index 0000000000..a4a83fad35
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpidity/ProtocolError.java
@@ -0,0 +1,47 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpidity;
+
+
+/**
+ * ProtocolError
+ *
+ * @author Rafael H. Schloming
+ */
+
+class ProtocolError
+{
+
+ private final String format;
+ private final Object[] args;
+
+ public ProtocolError(String format, Object ... args)
+ {
+ this.format = format;
+ this.args = args;
+ }
+
+ public String getMessage()
+ {
+ return String.format(format, args);
+ }
+
+}
diff --git a/java/common/src/main/java/org/apache/qpidity/ProtocolHandler.java b/java/common/src/main/java/org/apache/qpidity/ProtocolHandler.java
new file mode 100644
index 0000000000..526ef50211
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpidity/ProtocolHandler.java
@@ -0,0 +1,39 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpidity;
+
+
+/**
+ * ProtocolHandler
+ *
+ * @author Rafael H. Schloming
+ */
+
+interface ProtocolHandler
+{
+
+ void init(ProtocolHeader header);
+
+ void frame(Frame frame);
+
+ void error(ProtocolError error);
+
+}
diff --git a/java/common/src/main/java/org/apache/qpidity/ProtocolHeader.java b/java/common/src/main/java/org/apache/qpidity/ProtocolHeader.java
new file mode 100644
index 0000000000..1f12e285e4
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpidity/ProtocolHeader.java
@@ -0,0 +1,64 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpidity;
+
+
+/**
+ * ProtocolHeader
+ *
+ * @author Rafael H. Schloming
+ */
+
+class ProtocolHeader
+{
+
+ final private byte instance;
+ final private byte major;
+ final private byte minor;
+
+ public ProtocolHeader(byte instance, byte major, byte minor)
+ {
+ this.instance = instance;
+ this.major = major;
+ this.minor = minor;
+ }
+
+ public byte getInstance()
+ {
+ return instance;
+ }
+
+ public byte getMajor()
+ {
+ return major;
+ }
+
+ public byte getMinor()
+ {
+ return minor;
+ }
+
+ public String toString()
+ {
+ return String.format("AMQP.%d %d-%d", instance, major, minor);
+ }
+
+}
diff --git a/java/common/src/main/java/org/apache/qpidity/Segment.java b/java/common/src/main/java/org/apache/qpidity/Segment.java
index cd1d4ab05d..ee9f60fad8 100644
--- a/java/common/src/main/java/org/apache/qpidity/Segment.java
+++ b/java/common/src/main/java/org/apache/qpidity/Segment.java
@@ -20,10 +20,14 @@
*/
package org.apache.qpidity;
-import java.nio.ByteBuffer;
-
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Iterator;
+
+import java.nio.ByteBuffer;
+
+import static org.apache.qpidity.Functions.*;
+
/**
* Segment
@@ -31,50 +35,40 @@ import java.util.Collection;
* @author Rafael H. Schloming
*/
-class Segment
+class Segment implements Iterable<ByteBuffer>
{
- private final Collection<Frame> frames = new ArrayList<Frame>();
+ private final Collection<ByteBuffer> fragments = new ArrayList<ByteBuffer>();
- public void add(Frame frame)
+ public void add(ByteBuffer fragment)
{
- frames.add(frame);
+ fragments.add(fragment);
}
- public ByteBuffer getPayload()
+ public Iterator<ByteBuffer> getFragments()
{
- // we should probably use our own decoder interface here so
- // that we can directly read from the incoming frame objects
- // and automatically skip frame boundaries without copying
- // everything in order to get a contiguous byte buffer
- int capacity = 0;
- for (Frame frame : frames)
- {
- capacity += frame.getSize();
- }
- ByteBuffer buf = ByteBuffer.allocate(capacity);
- for (Frame frame : frames)
- {
- buf.put(frame.getPayload());
- }
- buf.flip();
- return buf;
+ return new SliceIterator(fragments.iterator());
+ }
+
+ public Iterator<ByteBuffer> iterator()
+ {
+ return getFragments();
}
public String toString()
{
- StringBuffer buf = new StringBuffer();
- String sep = ",\n ";
+ StringBuilder str = new StringBuilder();
+ String sep = " | ";
- for (Frame f : frames)
+ for (ByteBuffer buf : this)
{
- buf.append(f.toString());
- buf.append(sep);
+ str.append(str(buf));
+ str.append(sep);
}
- buf.setLength(buf.length() - sep.length());
+ str.setLength(str.length() - sep.length());
- return buf.toString();
+ return str.toString();
}
}
diff --git a/java/common/src/main/java/org/apache/qpidity/SegmentAssembler.java b/java/common/src/main/java/org/apache/qpidity/SegmentAssembler.java
index 2d47e760c1..0fb90a2f53 100644
--- a/java/common/src/main/java/org/apache/qpidity/SegmentAssembler.java
+++ b/java/common/src/main/java/org/apache/qpidity/SegmentAssembler.java
@@ -22,6 +22,7 @@ package org.apache.qpidity;
import java.nio.ByteBuffer;
+
/**
* SegmentAssembler is a stateful handler that aggregates Frame events
* into Segment events. This should only be used where it is necessary
@@ -50,7 +51,10 @@ class SegmentAssembler<C> implements Handler<Event<C,Frame>>
segment = new Segment();
}
- segment.add(frame);
+ for (ByteBuffer fragment : frame)
+ {
+ segment.add(fragment);
+ }
if (frame.isLastFrame())
{
diff --git a/java/common/src/main/java/org/apache/qpidity/SegmentEncoder.java b/java/common/src/main/java/org/apache/qpidity/SegmentEncoder.java
new file mode 100644
index 0000000000..7d4e5aabe4
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpidity/SegmentEncoder.java
@@ -0,0 +1,173 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpidity;
+
+import java.nio.BufferOverflowException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+import static java.lang.Math.*;
+
+import static org.apache.qpidity.Frame.*;
+
+
+/**
+ * SegmentEncoder
+ *
+ * @author Rafael H. Schloming
+ */
+
+class SegmentEncoder extends AbstractEncoder
+{
+
+ private final Handler<ByteBuffer> handler;
+ private final int max;
+ private final byte flags;
+ private final byte track;
+ private final byte type;
+ private final int channel;
+
+ private int remaining;
+ private ByteBuffer frame;
+ private boolean first;
+
+ public SegmentEncoder(Handler<ByteBuffer> handler, int max,
+ byte flags,
+ byte track,
+ byte type,
+ int channel,
+ int remaining)
+ {
+ if (max < HEADER_SIZE + 1)
+ {
+ throw new IllegalArgumentException
+ ("max frame size must be large enough to include header");
+ }
+
+ this.handler = handler;
+ this.max = max;
+ this.flags = flags;
+ this.track = track;
+ this.type = type;
+ this.channel = channel;
+ this.remaining = remaining;
+ this.frame = null;
+ this.first = true;
+ }
+
+ private void preWrite() {
+ if (remaining == 0)
+ {
+ throw new BufferOverflowException();
+ }
+
+ if (frame == null)
+ {
+ frame = ByteBuffer.allocate(min(max, remaining + HEADER_SIZE));
+ frame.order(ByteOrder.BIG_ENDIAN);
+
+ byte frameFlags = flags;
+ if (first) { frameFlags |= FIRST_FRAME; first = false; }
+ if (remaining <= (frame.remaining() - HEADER_SIZE))
+ {
+ frameFlags |= LAST_FRAME;
+ }
+
+ frame.put(frameFlags);
+ frame.put(type);
+ frame.putShort((short) frame.limit());
+ frame.put(RESERVED);
+ frame.put(track);
+ frame.putShort((short) channel);
+ frame.put(RESERVED);
+ frame.put(RESERVED);
+ frame.put(RESERVED);
+ frame.put(RESERVED);
+
+ assert frame.position() == HEADER_SIZE;
+ }
+ }
+
+ private void postWrite() {
+ if (!frame.hasRemaining())
+ {
+ frame.flip();
+ handler.handle(frame);
+ frame = null;
+ }
+ }
+
+ @Override protected void put(byte b)
+ {
+ preWrite();
+ frame.put(b);
+ remaining -= 1;
+ postWrite();
+ }
+
+ @Override protected void put(ByteBuffer src)
+ {
+ if (src.remaining() > remaining)
+ {
+ throw new BufferOverflowException();
+ }
+
+ while (src.hasRemaining())
+ {
+ preWrite();
+ int limit = src.limit();
+ src.limit(src.position() + min(frame.remaining(), src.remaining()));
+ remaining -= src.remaining();
+ frame.put(src);
+ src.limit(limit);
+ postWrite();
+ }
+ }
+
+ public static final void main(String[] args) {
+ ByteBuffer buf = ByteBuffer.allocate(1024);
+ buf.put("AMQP_PROTOCOL_HEADER".getBytes());
+ buf.flip();
+
+ SegmentEncoder enc = new SegmentEncoder(new Handler<ByteBuffer>()
+ {
+ public void handle(ByteBuffer frame)
+ {
+ System.out.println(frame);
+ }
+ },
+ 16,
+ (byte) 0x0,
+ (byte) Frame.L1,
+ (byte) Frame.METHOD,
+ 0,
+ 7 + buf.remaining());
+ enc.put((byte)0);
+ enc.put((byte)1);
+ enc.put((byte)2);
+ enc.put((byte)3);
+ enc.put((byte)4);
+ enc.put((byte)5);
+ enc.put((byte)6);
+ enc.put(buf);
+ }
+
+}
diff --git a/java/common/src/main/java/org/apache/qpidity/Session.java b/java/common/src/main/java/org/apache/qpidity/Session.java
index 32da625070..7d74849b0b 100644
--- a/java/common/src/main/java/org/apache/qpidity/Session.java
+++ b/java/common/src/main/java/org/apache/qpidity/Session.java
@@ -29,7 +29,7 @@ import java.util.Map;
* @author Rafael H. Schloming
*/
-public class Session extends Invoker implements DelegateResolver<Session>
+public class Session extends Invoker
{
// channel may be null
@@ -37,7 +37,6 @@ public class Session extends Invoker implements DelegateResolver<Session>
private int command_id = 0;
// XXX
final Map<Integer,Handler<Struct>> handlers = new HashMap<Integer,Handler<Struct>>();
- final private Delegate<Session> delegate = new SessionDelegate();
public void attach(Channel channel)
{
@@ -62,9 +61,4 @@ public class Session extends Invoker implements DelegateResolver<Session>
return channel.getFactory();
}
- public Delegate<Session> resolve(Struct struct)
- {
- return delegate;
- }
-
}
diff --git a/java/common/src/main/java/org/apache/qpidity/SimpleDelegateResolver.java b/java/common/src/main/java/org/apache/qpidity/SimpleDelegateResolver.java
new file mode 100644
index 0000000000..fe52f0c90b
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpidity/SimpleDelegateResolver.java
@@ -0,0 +1,45 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpidity;
+
+
+/**
+ * SimpleDelegateResolver
+ *
+ * @author Rafael H. Schloming
+ */
+
+class SimpleDelegateResolver<C> implements DelegateResolver<C>
+{
+
+ private final Delegate<C> delegate;
+
+ public SimpleDelegateResolver(Delegate<C> delegate)
+ {
+ this.delegate = delegate;
+ }
+
+ public Delegate<C> resolve(Struct struct)
+ {
+ return delegate;
+ }
+
+}
diff --git a/java/common/src/main/java/org/apache/qpidity/SizeEncoder.java b/java/common/src/main/java/org/apache/qpidity/SizeEncoder.java
index 745f58fd6d..b6ecdd2818 100644
--- a/java/common/src/main/java/org/apache/qpidity/SizeEncoder.java
+++ b/java/common/src/main/java/org/apache/qpidity/SizeEncoder.java
@@ -23,7 +23,7 @@ package org.apache.qpidity;
import java.nio.ByteBuffer;
import java.util.Map;
-import java.util.UUID;
+
/**
* SizeEncoder
@@ -31,7 +31,7 @@ import java.util.UUID;
* @author Rafael H. Schloming
*/
-class SizeEncoder implements Encoder
+class SizeEncoder extends AbstractEncoder
{
private int size;
@@ -52,39 +52,17 @@ class SizeEncoder implements Encoder
this.size = size;
}
- public void writeBit(boolean b)
- {
- //throw new Error("TODO");
- }
-
- public void writeOctet(short b)
+ @Override protected void put(byte b)
{
size += 1;
}
- public void writeShort(int s)
+ @Override protected void put(ByteBuffer src)
{
- size += 2;
+ size += src.remaining();
}
- public void writeLong(long i)
- {
- size += 4;
- }
-
- public void writeLonglong(long l)
- {
- size += 8;
- }
-
-
- public void writeTimestamp(long l)
- {
- size += 8;
- }
-
-
- public void writeShortstr(String s)
+ @Override public void writeShortstr(String s)
{
if (s.length() > 255) {
throw new IllegalArgumentException(s);
@@ -93,30 +71,10 @@ class SizeEncoder implements Encoder
size += s.length();
}
- public void writeLongstr(String s)
+ @Override public void writeLongstr(String s)
{
- throw new Error("TODO");
- }
-
-
- public void writeTable(Map<String,?> table)
- {
- //throw new Error("TODO");
- }
-
- public void writeRfc1982LongSet(Range<Long>[] ranges)
- {
- throw new Error("TODO");
- }
-
- public void writeUuid(UUID uuid)
- {
- throw new Error("TODO");
- }
-
- public void writeContent(String c)
- {
- throw new Error("TODO");
+ writeLong(s.length());
+ size += s.length();
}
}
diff --git a/java/common/src/main/java/org/apache/qpidity/SliceIterator.java b/java/common/src/main/java/org/apache/qpidity/SliceIterator.java
new file mode 100644
index 0000000000..9b4a8f90f7
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpidity/SliceIterator.java
@@ -0,0 +1,59 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpidity;
+
+import java.nio.ByteBuffer;
+
+import java.util.Iterator;
+
+
+/**
+ * SliceIterator
+ *
+ * @author Rafael H. Schloming
+ */
+
+class SliceIterator implements Iterator<ByteBuffer>
+{
+
+ final private Iterator<ByteBuffer> iterator;
+
+ public SliceIterator(Iterator<ByteBuffer> iterator)
+ {
+ this.iterator = iterator;
+ }
+
+ public boolean hasNext()
+ {
+ return iterator.hasNext();
+ }
+
+ public ByteBuffer next()
+ {
+ return iterator.next().slice();
+ }
+
+ public void remove()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+}
diff --git a/java/common/src/main/java/org/apache/qpidity/Stub.java b/java/common/src/main/java/org/apache/qpidity/Stub.java
index be246d325b..e27fc89a59 100644
--- a/java/common/src/main/java/org/apache/qpidity/Stub.java
+++ b/java/common/src/main/java/org/apache/qpidity/Stub.java
@@ -1,32 +1,47 @@
package org.apache.qpidity;
-import java.nio.ByteBuffer;
import java.util.*;
import java.lang.annotation.*;
+import java.nio.ByteBuffer;
+
import static org.apache.qpidity.Option.*;
+
public class Stub {
- private static Connection conn = new Connection();
+ private static Connection conn = new Connection(new ConsoleOutput());
+
+ static
+ {
+ conn.init(new ProtocolHeader((byte) 1, (byte) 0, (byte) 10));
+ }
- private static void frame(short track, short type, boolean first, boolean last) {
+ private static void frame(byte track, byte type, boolean first, boolean last) {
frame(track, type, first, last, null);
}
- private static void frame(short track, short type, boolean first, boolean last, Method m) {
+ private static void frame(byte track, byte type, boolean first, boolean last, Method m) {
SizeEncoder sizer = new SizeEncoder();
if (m != null) {
+ sizer.writeLong(m.getEncodedType());
m.write(sizer);
+ sizer.flush();
}
- ByteBuffer buf = ByteBuffer.allocate(sizer.getSize() + 4);
+ ByteBuffer buf = ByteBuffer.allocate(sizer.getSize());
if (m != null) {
- buf.putInt(m.getEncodedType());
- m.write(new BBEncoder(buf));
+ Encoder enc = new BBEncoder(buf);
+ enc.writeLong(m.getEncodedType());
+ m.write(enc);
+ enc.flush();
}
buf.flip();
- Frame frame = new Frame((short)0, track, type, true, true, first, last, buf);
- conn.handle(frame);
+ byte flags = 0;
+ if (first) { flags |= Frame.FIRST_FRAME; }
+ if (last) { flags |= Frame.LAST_FRAME; }
+ Frame frame = new Frame(flags, type, track, 0);
+ frame.addFragment(buf);
+ conn.frame(frame);
}
public static final void main(String[] args) {
@@ -59,7 +74,16 @@ class ChannelDelegate extends Delegate<Channel> {
public @Override void sessionOpen(Channel channel, SessionOpen open) {
Session ssn = new Session();
ssn.attach(channel);
- System.out.println("Session Open");
+ long lifetime = open.getDetachedlifetime();
+ System.out.println("Session Opened lifetime = " + lifetime);
+ try
+ {
+ ssn.sessionAttached(UUID.randomUUID(), lifetime);
+ }
+ catch (QpidException e)
+ {
+ throw new RuntimeException(e);
+ }
}
}
@@ -74,11 +98,11 @@ class SessionDelegate extends Delegate<Session> {
System.out.println("got an exchange declare: " + ed.getExchange() + ", " + ed.getType());
try
{
- session.queueDeclare("asdf", "alternate", null);
+ session.queueDeclare("asdf", "alternate", null);
}
- catch(Exception e)
+ catch(QpidException e)
{
-
+ throw new RuntimeException(e);
}
}
diff --git a/java/common/src/main/java/org/apache/qpidity/Switch.java b/java/common/src/main/java/org/apache/qpidity/Switch.java
index a84ae1507b..166dc33134 100644
--- a/java/common/src/main/java/org/apache/qpidity/Switch.java
+++ b/java/common/src/main/java/org/apache/qpidity/Switch.java
@@ -44,6 +44,12 @@ abstract class Switch<K,E> implements Handler<E>
{
K key = resolve(event);
Handler<E> handler = handlers.get(key);
+ if (handler == null)
+ {
+ throw new IllegalStateException("no such key: " + key +
+ " this = " + this +
+ " handlers = " + handlers);
+ }
handler.handle(event);
}
diff --git a/java/common/src/main/java/org/apache/qpidity/TrackSwitch.java b/java/common/src/main/java/org/apache/qpidity/TrackSwitch.java
index c9b3cda032..28a7d75f05 100644
--- a/java/common/src/main/java/org/apache/qpidity/TrackSwitch.java
+++ b/java/common/src/main/java/org/apache/qpidity/TrackSwitch.java
@@ -28,10 +28,10 @@ package org.apache.qpidity;
* @author Rafael H. Schloming
*/
-class TrackSwitch<C> extends Switch<Short,Event<C,Frame>>
+class TrackSwitch<C> extends Switch<Byte,Event<C,Frame>>
{
- public Short resolve(Event<C,Frame> event)
+ public Byte resolve(Event<C,Frame> event)
{
return event.target.getTrack();
}
diff --git a/java/common/src/main/java/org/apache/qpidity/TypeSwitch.java b/java/common/src/main/java/org/apache/qpidity/TypeSwitch.java
index a5073486c7..fc53b0b9b4 100644
--- a/java/common/src/main/java/org/apache/qpidity/TypeSwitch.java
+++ b/java/common/src/main/java/org/apache/qpidity/TypeSwitch.java
@@ -28,10 +28,10 @@ package org.apache.qpidity;
* @author Rafael H. Schloming
*/
-class TypeSwitch<C> extends Switch<Short,Event<C,Frame>>
+class TypeSwitch<C> extends Switch<Byte,Event<C,Frame>>
{
- public Short resolve(Event<C,Frame> event)
+ public Byte resolve(Event<C,Frame> event)
{
return event.target.getType();
}