diff options
| author | Rafael H. Schloming <rhs@apache.org> | 2007-08-01 05:28:01 +0000 |
|---|---|---|
| committer | Rafael H. Schloming <rhs@apache.org> | 2007-08-01 05:28:01 +0000 |
| commit | 7c16ea45a0193861868f27a53403995d57e49e66 (patch) | |
| tree | 33c9d6d2814818d07a50ed28cc384293a8de5fe5 /java/common | |
| parent | 92de1f9135b949a9b75f818190ad1f010e336d78 (diff) | |
| download | qpid-python-7c16ea45a0193861868f27a53403995d57e49e66.tar.gz | |
plumbed the input/output into MINA and filled out more of the encoding/decoding
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@561665 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/common')
34 files changed, 1714 insertions, 368 deletions
diff --git a/java/common/src/main/java/org/apache/qpidity/AbstractDecoder.java b/java/common/src/main/java/org/apache/qpidity/AbstractDecoder.java new file mode 100644 index 0000000000..42da09f2d9 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpidity/AbstractDecoder.java @@ -0,0 +1,149 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpidity; + +import java.util.Map; +import java.util.UUID; + +import static org.apache.qpidity.Functions.*; + + +/** + * AbstractDecoder + * + * @author Rafael H. Schloming + */ + +abstract class AbstractDecoder implements Decoder +{ + + protected abstract byte get(); + + protected abstract void get(byte[] bytes); + + private byte bits = 0x0; + private byte nbits = 0; + + public boolean readBit() + { + if (nbits == 0) + { + bits = get(); + nbits = 8; + } + + nbits -= 1; + + boolean result = ((bits >>> nbits) & 0x01) != 0; + return result; + } + + private void clearBits() + { + bits = 0x0; + nbits = 0; + } + + public short readOctet() + { + clearBits(); + return unsigned(get()); + } + + public int readShort() + { + clearBits(); + int i = get() << 8; + i |= get(); + return i; + } + + public long readLong() + { + clearBits(); + long l = get() << 24; + l |= get() << 16; + l |= get() << 8; + l |= get(); + return l; + } + + public long readLonglong() + { + clearBits(); + long l = get() << 56; + l |= get() << 48; + l |= get() << 40; + l |= get() << 32; + l |= get() << 24; + l |= get() << 16; + l |= get() << 8; + l |= get(); + return l; + } + + public long readTimestamp() + { + return readLonglong(); + } + + + public String readShortstr() + { + short size = readOctet(); + byte[] bytes = new byte[size]; + get(bytes); + return new String(bytes); + } + + public String readLongstr() + { + long size = readLong(); + assert size <= Integer.MAX_VALUE; + byte[] bytes = new byte[(int) size]; + get(bytes); + return new String(bytes); + } + + public Map<String,?> readTable() + { + //throw new Error("TODO"); + return null; + } + + public Range<Long>[] readRfc1982LongSet() + { + throw new Error("TODO"); + } + + public UUID readUuid() + { + long msb = readLong(); + long lsb = readLong(); + return new UUID(msb, lsb); + } + + public String readContent() + { + throw new Error("Deprecated"); + } + +} diff --git a/java/common/src/main/java/org/apache/qpidity/AbstractEncoder.java b/java/common/src/main/java/org/apache/qpidity/AbstractEncoder.java new file mode 100644 index 0000000000..5ca68fe564 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpidity/AbstractEncoder.java @@ -0,0 +1,163 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpidity; + +import java.nio.ByteBuffer; + +import java.util.Map; +import java.util.UUID; + +import static org.apache.qpidity.Functions.*; + + +/** + * AbstractEncoder + * + * @author Rafael H. Schloming + */ + +abstract class AbstractEncoder implements Encoder +{ + + protected abstract void put(byte b); + + protected abstract void put(ByteBuffer src); + + private byte bits = 0x0; + private byte nbits = 0; + + public void writeBit(boolean b) + { + if (b) + { + bits |= 1 << nbits; + } + + nbits += 1; + + if (nbits == 8) + { + flushBits(); + } + } + + private void flushBits() + { + if (nbits > 0) + { + put(bits); + bits = 0x0; + nbits = 0; + } + } + + public void writeOctet(short b) + { + assert b < 0x100; + + flushBits(); + put((byte) b); + } + + public void writeShort(int s) + { + assert s < 0x10000; + + flushBits(); + put(lsb(s >> 8)); + put(lsb(s)); + } + + public void writeLong(long i) + { + assert i < 0x100000000L; + + flushBits(); + put(lsb(i >> 24)); + put(lsb(i >> 16)); + put(lsb(i >> 8)); + put(lsb(i)); + } + + public void writeLonglong(long l) + { + flushBits(); + put(lsb(l >> 56)); + put(lsb(l >> 48)); + put(lsb(l >> 40)); + put(lsb(l >> 32)); + put(lsb(l >> 24)); + put(lsb(l >> 16)); + put(lsb(l >> 8)); + put(lsb(l)); + } + + + public void writeTimestamp(long l) + { + flushBits(); + writeLonglong(l); + } + + + public void writeShortstr(String s) + { + if (s.length() > 255) { + throw new IllegalArgumentException(s); + } + writeOctet((short) s.length()); + put(ByteBuffer.wrap(s.getBytes())); + } + + public void writeLongstr(String s) + { + writeLong(s.length()); + put(ByteBuffer.wrap(s.getBytes())); + } + + + public void writeTable(Map<String,?> table) + { + //throw new Error("TODO"); + } + + public void writeRfc1982LongSet(Range<Long>[] ranges) + { + throw new Error("TODO"); + } + + public void writeUuid(UUID uuid) + { + writeLong(uuid.getMostSignificantBits()); + writeLong(uuid.getLeastSignificantBits()); + } + + public void writeContent(String c) + { + throw new Error("Deprecated"); + } + + public void flush() + { + flushBits(); + } + +} diff --git a/java/common/src/main/java/org/apache/qpidity/AbstractMethod.java b/java/common/src/main/java/org/apache/qpidity/AbstractMethod.java index 7db1aeea43..258c33d715 100644 --- a/java/common/src/main/java/org/apache/qpidity/AbstractMethod.java +++ b/java/common/src/main/java/org/apache/qpidity/AbstractMethod.java @@ -20,7 +20,6 @@ */ package org.apache.qpidity; -import java.nio.ByteBuffer; /** * AbstractMethod @@ -28,4 +27,19 @@ import java.nio.ByteBuffer; * @author Rafael H. Schloming */ -abstract class AbstractMethod extends AbstractStruct implements Method {} +abstract class AbstractMethod extends AbstractStruct implements Method { + + public byte getEncodedTrack() + { + // XXX + return Frame.L2; + } + + // XXX: do we need a segment base type? + public byte getSegmentType() + { + // XXX + return Frame.METHOD; + } + +} diff --git a/java/common/src/main/java/org/apache/qpidity/BBDecoder.java b/java/common/src/main/java/org/apache/qpidity/BBDecoder.java index 9c5da34cd1..08ac03113f 100644 --- a/java/common/src/main/java/org/apache/qpidity/BBDecoder.java +++ b/java/common/src/main/java/org/apache/qpidity/BBDecoder.java @@ -22,8 +22,6 @@ package org.apache.qpidity; import java.nio.ByteBuffer; -import java.util.Map; -import java.util.UUID; /** * BBDecoder @@ -31,24 +29,9 @@ import java.util.UUID; * @author Rafael H. Schloming */ -class BBDecoder implements Decoder +class BBDecoder extends AbstractDecoder { - private static final short unsigned(byte b) - { - return (short) ((0x100 + b) & 0xFF); - } - - private static final int unsigned(short s) - { - return (0x10000 + s) & 0xFFFF; - } - - private static final long unsigned(int i) - { - return (0x1000000000L + i) & 0xFFFFFFFFL; - } - private final ByteBuffer in; public BBDecoder(ByteBuffer in) @@ -56,74 +39,14 @@ class BBDecoder implements Decoder this.in = in; } - public boolean readBit() - { - //throw new Error("TODO"); - return false; - } - - public short readOctet() - { - return unsigned(in.get()); - } - - public int readShort() - { - return unsigned(in.getShort()); - } - - public long readLong() - { - return unsigned(in.getInt()); - } - - public long readLonglong() + protected byte get() { - throw new Error("TODO"); + return in.get(); } - public long readTimestamp() + protected void get(byte[] bytes) { - throw new Error("TODO"); - } - - - public String readShortstr() - { - short size = readOctet(); - byte[] bytes = new byte[size]; - in.get(bytes); - return new String(bytes); - } - - public String readLongstr() - { - long size = readLong(); - assert size <= Integer.MAX_VALUE; - byte[] bytes = new byte[(int) size]; in.get(bytes); - return new String(bytes); - } - - public Map<String,?> readTable() - { - //throw new Error("TODO"); - return null; - } - - public Range<Long>[] readRfc1982LongSet() - { - throw new Error("TODO"); - } - - public UUID readUuid() - { - throw new Error("TODO"); - } - - public String readContent() - { - throw new Error("TODO"); } } diff --git a/java/common/src/main/java/org/apache/qpidity/BBEncoder.java b/java/common/src/main/java/org/apache/qpidity/BBEncoder.java index 50b8659fd7..947a461ab3 100644 --- a/java/common/src/main/java/org/apache/qpidity/BBEncoder.java +++ b/java/common/src/main/java/org/apache/qpidity/BBEncoder.java @@ -22,8 +22,6 @@ package org.apache.qpidity; import java.nio.ByteBuffer; -import java.util.Map; -import java.util.UUID; /** * BBEncoder @@ -31,7 +29,7 @@ import java.util.UUID; * @author Rafael H. Schloming */ -class BBEncoder implements Encoder +class BBEncoder extends AbstractEncoder { private final ByteBuffer out; @@ -40,75 +38,14 @@ class BBEncoder implements Encoder this.out = out; } - public void writeBit(boolean b) + @Override protected void put(byte b) { - //throw new Error("TODO"); + out.put(b); } - public void writeOctet(short b) + @Override protected void put(ByteBuffer src) { - assert b < 0x100; - out.put((byte) b); - } - - public void writeShort(int s) - { - assert s < 0x10000; - out.putShort((short) s); - } - - public void writeLong(long i) - { - assert i < 0x100000000L; - out.putInt((int) i); - } - - public void writeLonglong(long l) - { - throw new Error("TODO"); - } - - - public void writeTimestamp(long l) - { - throw new Error("TODO"); - } - - - public void writeShortstr(String s) - { - if (s.length() > 255) { - throw new IllegalArgumentException(s); - } - writeOctet((short) s.length()); - out.put(s.getBytes()); - } - - public void writeLongstr(String s) - { - writeLong(s.length()); - out.put(s.getBytes()); - } - - - public void writeTable(Map<String,?> table) - { - //throw new Error("TODO"); - } - - public void writeRfc1982LongSet(Range<Long>[] ranges) - { - throw new Error("TODO"); - } - - public void writeUuid(UUID uuid) - { - throw new Error("TODO"); - } - - public void writeContent(String c) - { - throw new Error("TODO"); + out.put(src); } } diff --git a/java/common/src/main/java/org/apache/qpidity/Channel.java b/java/common/src/main/java/org/apache/qpidity/Channel.java index a9060622e8..4f23112da3 100644 --- a/java/common/src/main/java/org/apache/qpidity/Channel.java +++ b/java/common/src/main/java/org/apache/qpidity/Channel.java @@ -20,32 +20,43 @@ */ package org.apache.qpidity; - /** * Channel * * @author Rafael H. Schloming */ -class Channel extends Invoker - implements Handler<Frame>, DelegateResolver<Channel> +class Channel extends Invoker implements Handler<Frame> { final private Connection connection; + final private int channel; final private TrackSwitch<Channel> tracks; - // session may be null private Session session; - public Channel(Connection connection) + public Channel(Connection connection, int channel) { this.connection = connection; + this.channel = channel; + + DelegateResolver<Channel> chDR = + new SimpleDelegateResolver<Channel>(new ChannelDelegate()); + DelegateResolver<Session> ssnDR = + new SimpleDelegateResolver<Session>(new SessionDelegate()); + tracks = new TrackSwitch<Channel>(); - tracks.map(Frame.L1, new MethodHandler<Channel>()); - tracks.map(Frame.L2, new MethodHandler<Channel>()); - tracks.map(Frame.L3, new SessionResolver<Frame>(new MethodHandler<Session>())); - tracks.map(Frame.L4, new SessionResolver<Frame>(new ContentHandler<Session>())); + tracks.map(Frame.L1, new MethodHandler<Channel>(getFactory(), chDR)); + tracks.map(Frame.L2, new MethodHandler<Channel>(getFactory(), chDR)); + tracks.map(Frame.L3, new SessionResolver<Frame> + (new MethodHandler<Session>(getFactory(), ssnDR))); + tracks.map(Frame.L4, new SessionResolver<Frame> + (new ContentHandler<Session>(getFactory(), ssnDR))); + } + + public int getEncodedChannel() { + return channel; } public Session getSession() @@ -63,9 +74,25 @@ class Channel extends Invoker tracks.handle(new Event<Channel,Frame>(this, frame)); } - public void write(Writable writable) + public void write(Method m) { - System.out.println("writing: " + writable); + SizeEncoder sizer = new SizeEncoder(); + sizer.writeLong(m.getEncodedType()); + m.write(sizer); + sizer.flush(); + int size = sizer.getSize(); + + // XXX: need to set header flags properly + SegmentEncoder enc = new SegmentEncoder(connection.getOutputHandler(), + connection.getMaxFrame(), + (byte) 0x0, + m.getEncodedTrack(), + m.getSegmentType(), + channel, + size); + enc.writeLong(m.getEncodedType()); + m.write(enc); + enc.flush(); } protected StructFactory getFactory() @@ -73,11 +100,6 @@ class Channel extends Invoker return connection.getFactory(); } - public Delegate<Channel> resolve(Struct struct) - { - return new ChannelDelegate(); - } - protected void invoke(Method m) { write(m); diff --git a/java/common/src/main/java/org/apache/qpidity/CommonSessionDelegate.java b/java/common/src/main/java/org/apache/qpidity/CommonSessionDelegate.java index b8efa857f6..e9173a5c5b 100644 --- a/java/common/src/main/java/org/apache/qpidity/CommonSessionDelegate.java +++ b/java/common/src/main/java/org/apache/qpidity/CommonSessionDelegate.java @@ -1,27 +1,51 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ package org.apache.qpidity; + +/** + * CommonSessionDelegate + */ + public class CommonSessionDelegate extends Delegate<Session> { - public void sessionAttached(Session session, SessionAttached struct) {} - - public void sessionFlow(Session session, SessionFlow struct) {} - - public void sessionFlowOk(Session session, SessionFlowOk struct) {} - - public void sessionClose(Session session, SessionClose struct) {} - - public void sessionClosed(Session session, SessionClosed struct) {} - - public void sessionResume(Session session, SessionResume struct) {} - - public void sessionPing(Session session, SessionPing struct) {} - - public void sessionPong(Session session, SessionPong struct) {} - - public void sessionSuspend(Session session, SessionSuspend struct) {} - - public void sessionDetached(Session session, SessionDetached struct) {} - + @Override public void sessionAttached(Session session, SessionAttached struct) {} + + @Override public void sessionFlow(Session session, SessionFlow struct) {} + + @Override public void sessionFlowOk(Session session, SessionFlowOk struct) {} + + @Override public void sessionClose(Session session, SessionClose struct) {} + + @Override public void sessionClosed(Session session, SessionClosed struct) {} + + @Override public void sessionResume(Session session, SessionResume struct) {} + + @Override public void sessionPing(Session session, SessionPing struct) {} + + @Override public void sessionPong(Session session, SessionPong struct) {} + + @Override public void sessionSuspend(Session session, SessionSuspend struct) {} + + @Override public void sessionDetached(Session session, SessionDetached struct) {} } diff --git a/java/common/src/main/java/org/apache/qpidity/Connection.java b/java/common/src/main/java/org/apache/qpidity/Connection.java index bb8b945051..7e31ca9b57 100644 --- a/java/common/src/main/java/org/apache/qpidity/Connection.java +++ b/java/common/src/main/java/org/apache/qpidity/Connection.java @@ -23,6 +23,9 @@ package org.apache.qpidity; import java.util.HashMap; import java.util.Map; +import java.nio.ByteBuffer; + + /** * Connection * @@ -33,27 +36,79 @@ import java.util.Map; * short instead of Short */ -class Connection implements Handler<Frame> +class Connection implements ProtocolActions { - final private Map<Short,Channel> channels = new HashMap<Short,Channel>(); - final private StructFactory factory = new StructFactory_v0_10(); + final private Handler<ByteBuffer> input; + final private Handler<ByteBuffer> output; + + final private Map<Integer,Channel> channels = new HashMap<Integer,Channel>(); + private StructFactory factory; + + // XXX + private int maxFrame = 64*1024; + + public Connection(Handler<ByteBuffer> output) + { + this.input = new InputHandler(this); + this.output = output; + } + + public Handler<ByteBuffer> getInputHandler() + { + return input; + } + + public Handler<ByteBuffer> getOutputHandler() + { + return output; + } + + public StructFactory getFactory() + { + return factory; + } + + public int getMaxFrame() + { + return maxFrame; + } - public void handle(Frame frame) + public void init(ProtocolHeader header) + { + System.out.println(header); + // XXX: hardcoded versions + if (header.getMajor() != 0 && header.getMinor() != 10) + { + ByteBuffer buf = ByteBuffer.allocate(8); + buf.put("AMQP".getBytes()); + buf.put((byte) 1); + buf.put((byte) 1); + buf.put((byte) 0); + buf.put((byte) 10); + buf.flip(); + output.handle(buf); + // XXX: how do we close the connection? + } else { + factory = new StructFactory_v0_10(); + } + } + + public void frame(Frame frame) { Channel channel = channels.get(frame.getChannel()); if (channel == null) { - channel = new Channel(this); + channel = new Channel(this, frame.getChannel()); channels.put(frame.getChannel(), channel); } channel.handle(frame); } - public StructFactory getFactory() + public void error(ProtocolError error) { - return factory; + throw new RuntimeException(error.getMessage()); } } diff --git a/java/common/src/main/java/org/apache/qpidity/ConsoleOutput.java b/java/common/src/main/java/org/apache/qpidity/ConsoleOutput.java new file mode 100644 index 0000000000..fa35e9332f --- /dev/null +++ b/java/common/src/main/java/org/apache/qpidity/ConsoleOutput.java @@ -0,0 +1,42 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpidity; + +import java.nio.ByteBuffer; + +import static org.apache.qpidity.Functions.*; + + +/** + * ConsoleOutput + * + * @author Rafael H. Schloming + */ + +class ConsoleOutput implements Handler<ByteBuffer> +{ + + public void handle(ByteBuffer buf) + { + System.out.println(str(buf)); + } + +} diff --git a/java/common/src/main/java/org/apache/qpidity/ContentHandler.java b/java/common/src/main/java/org/apache/qpidity/ContentHandler.java index ea73a3574d..241d265bc4 100644 --- a/java/common/src/main/java/org/apache/qpidity/ContentHandler.java +++ b/java/common/src/main/java/org/apache/qpidity/ContentHandler.java @@ -29,12 +29,13 @@ package org.apache.qpidity; * @author Rafael H. Schloming */ -class ContentHandler<C extends DelegateResolver<C>> extends TypeSwitch<C> +class ContentHandler<C> extends TypeSwitch<C> { - public ContentHandler() + public ContentHandler(StructFactory factory, DelegateResolver<C> resolver) { - map(Frame.METHOD, new SegmentAssembler<C>(new MethodDispatcher<C>())); + MethodDispatcher<C> md = new MethodDispatcher<C>(factory, resolver); + map(Frame.METHOD, new SegmentAssembler<C>(md)); map(Frame.HEADER, new SegmentAssembler<C>(new HeaderHandler<C>())); map(Frame.BODY, new BodyHandler<C>()); } diff --git a/java/common/src/main/java/org/apache/qpidity/Encoder.java b/java/common/src/main/java/org/apache/qpidity/Encoder.java index 716df97397..954b690dc2 100644 --- a/java/common/src/main/java/org/apache/qpidity/Encoder.java +++ b/java/common/src/main/java/org/apache/qpidity/Encoder.java @@ -49,4 +49,6 @@ public interface Encoder void writeContent(String c); + void flush(); + } diff --git a/java/common/src/main/java/org/apache/qpidity/FragmentDecoder.java b/java/common/src/main/java/org/apache/qpidity/FragmentDecoder.java new file mode 100644 index 0000000000..dcf1c19c38 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpidity/FragmentDecoder.java @@ -0,0 +1,91 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpidity; + +import java.nio.BufferUnderflowException; +import java.nio.ByteBuffer; + +import java.util.Iterator; + +import static java.lang.Math.*; + + +/** + * FragmentDecoder + * + * @author Rafael H. Schloming + */ + +class FragmentDecoder extends AbstractDecoder +{ + + private final Iterator<ByteBuffer> fragments; + private ByteBuffer current; + + public FragmentDecoder(Iterator<ByteBuffer> fragments) + { + this.fragments = fragments; + this.current = null; + } + + private void preRead() + { + if (current == null) + { + if (!fragments.hasNext()) + { + throw new BufferUnderflowException(); + } + + current = fragments.next(); + } + } + + private void postRead() + { + if (current.remaining() == 0) + { + current = null; + } + } + + @Override protected byte get() + { + preRead(); + byte b = current.get(); + postRead(); + return b; + } + + @Override protected void get(byte[] bytes) + { + int remaining = bytes.length; + while (remaining > 0) + { + preRead(); + int size = min(remaining, current.remaining()); + current.get(bytes, 0, size); + remaining -= size; + postRead(); + } + } + +} diff --git a/java/common/src/main/java/org/apache/qpidity/Frame.java b/java/common/src/main/java/org/apache/qpidity/Frame.java index d1bebd8365..1c8bb64fcf 100644 --- a/java/common/src/main/java/org/apache/qpidity/Frame.java +++ b/java/common/src/main/java/org/apache/qpidity/Frame.java @@ -21,7 +21,13 @@ package org.apache.qpidity; import java.nio.ByteBuffer; -import java.nio.ShortBuffer; + +import java.util.ArrayList; +import java.util.List; +import java.util.Iterator; + +import static org.apache.qpidity.Functions.*; + /** * Frame @@ -29,103 +35,127 @@ import java.nio.ShortBuffer; * @author Rafael H. Schloming */ -class Frame +class Frame implements Iterable<ByteBuffer> { + public static final int HEADER_SIZE = 12; + + // XXX: enums? + public static final byte L1 = 0; + public static final byte L2 = 1; + public static final byte L3 = 2; + public static final byte L4 = 3; + + public static final byte METHOD = 1; + public static final byte HEADER = 2; + public static final byte BODY = 3; + + public static final byte RESERVED = 0x0; + + public static final byte FIRST_SEG = 0x8; + public static final byte LAST_SEG = 0x4; + public static final byte FIRST_FRAME = 0x2; + public static final byte LAST_FRAME = 0x1; - public static final short L1 = 0; - public static final short L2 = 1; - public static final short L3 = 2; - public static final short L4 = 3; - - public static final short METHOD = 1; - public static final short HEADER = 2; - public static final short BODY = 3; - - final private short channel; - final private short track; - final private short type; - final private boolean firstSegment; - final private boolean lastSegment; - final private boolean firstFrame; - final private boolean lastFrame; - final private ByteBuffer payload; - - // XXX - final private int sequence = 0; - - public Frame(short channel, short track, short type, boolean firstSegment, - boolean lastSegment, boolean firstFrame, boolean lastFrame, - ByteBuffer payload) + final private byte flags; + final private byte type; + final private byte track; + final private int channel; + final private List<ByteBuffer> fragments; + private int size; + + public Frame(byte flags, byte type, byte track, int channel) { - this.channel = channel; - this.track = track; + this.flags = flags; this.type = type; - this.firstSegment = firstSegment; - this.lastSegment = lastSegment; - this.firstFrame = firstFrame; - this.lastFrame = lastFrame; - this.payload = payload; + this.track = track; + this.channel = channel; + this.size = 0; + this.fragments = new ArrayList<ByteBuffer>(); } - public short getChannel() + public void addFragment(ByteBuffer fragment) + { + fragments.add(fragment); + size += fragment.remaining(); + } + + public int getChannel() { return channel; } - public short getTrack() + public int getSize() { - return track; + return size; } - public short getType() + public byte getType() { return type; } + public byte getTrack() + { + return track; + } + + private boolean flag(byte mask) + { + return (flags & mask) != 0; + } + public boolean isFirstSegment() { - return firstSegment; + return flag(FIRST_SEG); } public boolean isLastSegment() { - return lastSegment; + return flag(LAST_SEG); } public boolean isFirstFrame() { - return firstFrame; + return flag(FIRST_FRAME); } public boolean isLastFrame() { - return lastFrame; + return flag(LAST_FRAME); } - public ByteBuffer getPayload() + public Iterator<ByteBuffer> getFragments() { - return payload.slice(); + return new SliceIterator(fragments.iterator()); } - public int getSize() + public Iterator<ByteBuffer> iterator() { - return payload.remaining(); + return getFragments(); } public String toString() { StringBuilder str = new StringBuilder(); str.append(String.format - ("[%05d %05d %1d %1d %d%d%d%d]", channel, getSize(), track, type, - firstSegment ? 1 : 0, lastSegment ? 1 : 0, - firstFrame ? 1 : 0, lastFrame ? 1 : 0)); - ShortBuffer shorts = payload.asShortBuffer(); - for (int i = 0; i < shorts.limit(); i++) { - str.append(String.format(" %04x", shorts.get(i))); - if (str.length() > 70) { - str.append(" ..."); - break; + ("[%05d %05d %1d %1d %d%d%d%d]", getChannel(), getSize(), + getTrack(), getType(), + isFirstSegment() ? 1 : 0, isLastSegment() ? 1 : 0, + isFirstFrame() ? 1 : 0, isLastFrame() ? 1 : 0)); + + boolean first = true; + for (ByteBuffer buf : this) + { + if (first) + { + first = false; + } + else + { + str.append(" | "); } + + str.append(str(buf)); } return str.toString(); diff --git a/java/common/src/main/java/org/apache/qpidity/Functions.java b/java/common/src/main/java/org/apache/qpidity/Functions.java new file mode 100644 index 0000000000..f5217bf038 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpidity/Functions.java @@ -0,0 +1,80 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpidity; + +import java.nio.ByteBuffer; + + +/** + * Functions + * + * @author Rafael H. Schloming + */ + +class Functions +{ + + public static final short unsigned(byte b) + { + return (short) ((0x100 + b) & 0xFF); + } + + public static final int unsigned(short s) + { + return (0x10000 + s) & 0xFFFF; + } + + public static final long unsigned(int i) + { + return (0x1000000000L + i) & 0xFFFFFFFFL; + } + + public static final byte lsb(int i) + { + return (byte) (0xFF & i); + } + + public static final byte lsb(long l) + { + return (byte) (0xFF & l); + } + + public static final String str(ByteBuffer buf) + { + return str(buf, buf.limit()); + } + + public static final String str(ByteBuffer buf, int limit) + { + StringBuilder str = new StringBuilder(); + for (int i = 0; i < limit; i++) + { + if (i > 0 && i % 2 == 0) + { + str.append(" "); + } + str.append(String.format("%02x", buf.get(i))); + } + + return str.toString(); + } + +} diff --git a/java/common/src/main/java/org/apache/qpidity/InputHandler.java b/java/common/src/main/java/org/apache/qpidity/InputHandler.java new file mode 100644 index 0000000000..5cd4fe87c1 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpidity/InputHandler.java @@ -0,0 +1,230 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpidity; + +import java.nio.ByteBuffer; + +import static org.apache.qpidity.InputHandler.State.*; + + +/** + * InputHandler + * + * @author Rafael H. Schloming + */ + +class InputHandler implements Handler<ByteBuffer> +{ + + enum State + { + PROTO_HDR, + PROTO_HDR_M, + PROTO_HDR_Q, + PROTO_HDR_P, + PROTO_HDR_CLASS, + PROTO_HDR_INSTANCE, + PROTO_HDR_MAJOR, + PROTO_HDR_MINOR, + FRAME_HDR, + FRAME_HDR_TYPE, + FRAME_HDR_SIZE1, + FRAME_HDR_SIZE2, + FRAME_HDR_RSVD1, + FRAME_HDR_TRACK, + FRAME_HDR_CH1, + FRAME_HDR_CH2, + FRAME_HDR_RSVD2, + FRAME_HDR_RSVD3, + FRAME_HDR_RSVD4, + FRAME_HDR_RSVD5, + FRAME_PAYLOAD, + FRAME_FRAGMENT, + ERROR; + } + + private final ProtocolActions actions; + private State state; + + private byte instance; + private byte major; + private byte minor; + + private byte flags; + private byte type; + private byte track; + private int channel; + private int size; + private Frame frame; + + public InputHandler(ProtocolActions actions, State state) + { + this.actions = actions; + this.state = state; + } + + public InputHandler(ProtocolActions actions) + { + this(actions, PROTO_HDR); + } + + private void init() + { + actions.init(new ProtocolHeader(instance, major, minor)); + } + + private void frame() + { + assert size == frame.getSize(); + actions.frame(frame); + frame = null; + } + + private void error(String fmt, Object ... args) + { + actions.error(new ProtocolError(fmt, args)); + } + + public void handle(ByteBuffer buf) + { + while (buf.hasRemaining()) + { + state = next(buf); + } + } + + private State next(ByteBuffer buf) + { + switch (state) { + case PROTO_HDR: + return expect(buf, 'A', PROTO_HDR_M); + case PROTO_HDR_M: + return expect(buf, 'M', PROTO_HDR_Q); + case PROTO_HDR_Q: + return expect(buf, 'Q', PROTO_HDR_P); + case PROTO_HDR_P: + return expect(buf, 'P', PROTO_HDR_CLASS); + case PROTO_HDR_CLASS: + return expect(buf, 1, PROTO_HDR_INSTANCE); + case PROTO_HDR_INSTANCE: + instance = buf.get(); + return PROTO_HDR_MAJOR; + case PROTO_HDR_MAJOR: + major = buf.get(); + return PROTO_HDR_MINOR; + case PROTO_HDR_MINOR: + minor = buf.get(); + init(); + return FRAME_HDR; + case FRAME_HDR: + flags = buf.get(); + return FRAME_HDR_TYPE; + case FRAME_HDR_TYPE: + type = buf.get(); + return FRAME_HDR_SIZE1; + case FRAME_HDR_SIZE1: + size = buf.get() << 8; + return FRAME_HDR_SIZE2; + case FRAME_HDR_SIZE2: + size += buf.get(); + size -= 12; + return FRAME_HDR_RSVD1; + case FRAME_HDR_RSVD1: + return expect(buf, 0, FRAME_HDR_TRACK); + case FRAME_HDR_TRACK: + byte b = buf.get(); + if ((b & 0xF0) != 0) { + error("non-zero reserved bits in upper nibble of " + + "frame header byte 5: '%x'", b); + return ERROR; + } else { + track = (byte) (b & 0xF); + return FRAME_HDR_CH1; + } + case FRAME_HDR_CH1: + channel = buf.get() << 8; + return FRAME_HDR_CH2; + case FRAME_HDR_CH2: + channel += buf.get(); + return FRAME_HDR_RSVD2; + case FRAME_HDR_RSVD2: + return expect(buf, 0, FRAME_HDR_RSVD3); + case FRAME_HDR_RSVD3: + return expect(buf, 0, FRAME_HDR_RSVD4); + case FRAME_HDR_RSVD4: + return expect(buf, 0, FRAME_HDR_RSVD5); + case FRAME_HDR_RSVD5: + return expect(buf, 0, FRAME_PAYLOAD); + case FRAME_PAYLOAD: + frame = new Frame(flags, type, track, channel); + if (size > buf.remaining()) { + frame.addFragment(buf.slice()); + buf.position(buf.limit()); + return FRAME_FRAGMENT; + } else { + ByteBuffer payload = buf.slice(); + payload.limit(size); + buf.position(buf.position() + size); + frame.addFragment(payload); + frame(); + return FRAME_HDR; + } + case FRAME_FRAGMENT: + int delta = size - frame.getSize(); + if (delta > buf.remaining()) { + frame.addFragment(buf.slice()); + buf.position(buf.limit()); + return FRAME_FRAGMENT; + } else { + ByteBuffer fragment = buf.slice(); + fragment.limit(delta); + buf.position(buf.position() + delta); + frame.addFragment(fragment); + frame(); + return FRAME_HDR; + } + default: + throw new IllegalStateException(); + } + } + + private State expect(ByteBuffer buf, int expected, State next) + { + return expect(buf, (byte) expected, next); + } + + private State expect(ByteBuffer buf, char expected, State next) + { + return expect(buf, (byte) expected, next); + } + + private State expect(ByteBuffer buf, byte expected, State next) + { + byte b = buf.get(); + if (b == expected) { + return next; + } else { + error("expecting '%x', got '%x'", expected, b); + return ERROR; + } + } + +} diff --git a/java/common/src/main/java/org/apache/qpidity/Method.java b/java/common/src/main/java/org/apache/qpidity/Method.java index 75e3bdfbaa..a3926b572e 100644 --- a/java/common/src/main/java/org/apache/qpidity/Method.java +++ b/java/common/src/main/java/org/apache/qpidity/Method.java @@ -31,4 +31,8 @@ interface Method extends Struct { int getEncodedType(); + byte getEncodedTrack(); + + byte getSegmentType(); + } diff --git a/java/common/src/main/java/org/apache/qpidity/MethodDispatcher.java b/java/common/src/main/java/org/apache/qpidity/MethodDispatcher.java index a2a4ebb0fc..8fbc6f547d 100644 --- a/java/common/src/main/java/org/apache/qpidity/MethodDispatcher.java +++ b/java/common/src/main/java/org/apache/qpidity/MethodDispatcher.java @@ -22,26 +22,35 @@ package org.apache.qpidity; import java.nio.ByteBuffer; +import java.util.Iterator; + + /** * A MethodDispatcher parses and dispatches a method segment. * * @author Rafael H. Schloming */ -class MethodDispatcher<C extends DelegateResolver<C>> - implements Handler<Event<C,Segment>> +class MethodDispatcher<C> implements Handler<Event<C,Segment>> { - // XXX: should be passed in - final private StructFactory factory = new StructFactory_v0_10(); + final private StructFactory factory; + final private DelegateResolver<C> resolver; + + public MethodDispatcher(StructFactory factory, DelegateResolver<C> resolver) + { + this.factory = factory; + this.resolver = resolver; + } public void handle(Event<C,Segment> event) { System.out.println("got method segment:\n " + event.target); - ByteBuffer bb = event.target.getPayload(); - int type = bb.getInt(); - Struct struct = factory.create(type, new BBDecoder(bb)); - Delegate<C> delegate = event.context.resolve(struct); + Iterator<ByteBuffer> fragments = event.target.getFragments(); + Decoder dec = new FragmentDecoder(fragments); + int type = (int) dec.readLong(); + Struct struct = factory.create(type, dec); + Delegate<C> delegate = resolver.resolve(struct); struct.delegate(event.context, delegate); } diff --git a/java/common/src/main/java/org/apache/qpidity/MethodHandler.java b/java/common/src/main/java/org/apache/qpidity/MethodHandler.java index ebc7471622..8f43f4c06f 100644 --- a/java/common/src/main/java/org/apache/qpidity/MethodHandler.java +++ b/java/common/src/main/java/org/apache/qpidity/MethodHandler.java @@ -29,12 +29,13 @@ package org.apache.qpidity; * @author Rafael H. Schloming */ -class MethodHandler<C extends DelegateResolver<C>> extends TypeSwitch<C> +class MethodHandler<C> extends TypeSwitch<C> { - public MethodHandler() + public MethodHandler(StructFactory factory, DelegateResolver<C> resolver) { - map(Frame.METHOD, new SegmentAssembler<C>(new MethodDispatcher<C>())); + MethodDispatcher md = new MethodDispatcher<C>(factory, resolver); + map(Frame.METHOD, new SegmentAssembler<C>(md)); } } diff --git a/java/common/src/main/java/org/apache/qpidity/MinaHandler.java b/java/common/src/main/java/org/apache/qpidity/MinaHandler.java new file mode 100644 index 0000000000..b54f53dd80 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpidity/MinaHandler.java @@ -0,0 +1,123 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpidity; + +import java.io.IOException; +import java.net.InetSocketAddress; + +import org.apache.mina.common.ByteBuffer; +import org.apache.mina.common.ConnectFuture; +import org.apache.mina.common.IdleStatus; +import org.apache.mina.common.IoAcceptor; +import org.apache.mina.common.IoConnector; +import org.apache.mina.common.IoHandler; +import org.apache.mina.common.IoSession; +import org.apache.mina.common.SimpleByteBufferAllocator; + +import org.apache.mina.transport.socket.nio.SocketAcceptor; +import org.apache.mina.transport.socket.nio.SocketConnector; + + +/** + * MinaHandler + * + * @author Rafael H. Schloming + */ + +class MinaHandler implements IoHandler +{ + + public void messageReceived(IoSession ssn, Object obj) + { + Connection conn = (Connection) ssn.getAttachment(); + ByteBuffer buf = (ByteBuffer) obj; + conn.getInputHandler().handle(buf.buf()); + } + + public void messageSent(IoSession ssn, Object obj) + { + System.out.println("TX: " + obj); + } + + public void exceptionCaught(IoSession ssn, Throwable e) + { + e.printStackTrace(); + } + + public void sessionCreated(IoSession ssn) + { + System.out.println("created " + ssn); + } + + public void sessionOpened(final IoSession ssn) + { + Connection conn = new Connection(new Handler<java.nio.ByteBuffer>() + { + public void handle(java.nio.ByteBuffer buf) + { + ssn.write(ByteBuffer.wrap(buf)); + } + }); + ssn.setAttachment(conn); + } + + public void sessionClosed(IoSession ssn) + { + System.out.println("closed " + ssn); + ssn.setAttachment(null); + } + + public void sessionIdle(IoSession ssn, IdleStatus status) + { + System.out.println(status); + } + + public static final void main(String[] args) throws IOException + { + ByteBuffer.setAllocator(new SimpleByteBufferAllocator()); + if (args[0].equals("accept")) { + accept(args); + } else if (args[0].equals("connect")) { + connect(args); + } + } + + public static final void accept(String[] args) throws IOException + { + IoAcceptor acceptor = new SocketAcceptor(); + acceptor.bind(new InetSocketAddress("0.0.0.0", 5672), new MinaHandler()); + } + + public static final void connect(String[] args) + { + IoConnector connector = new SocketConnector(); + ConnectFuture cf = connector.connect(new InetSocketAddress("0.0.0.0", 5672), new MinaHandler()); + cf.join(); + IoSession ssn = cf.getSession(); + ByteBuffer bb = ByteBuffer.allocate(1024); + bb.put("AMQP".getBytes()); + bb.flip(); + for (int i = 0; i < 10; i++) { + ssn.write(bb); + } + } + +} diff --git a/java/common/src/main/java/org/apache/qpidity/ProtocolActions.java b/java/common/src/main/java/org/apache/qpidity/ProtocolActions.java new file mode 100644 index 0000000000..c658320989 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpidity/ProtocolActions.java @@ -0,0 +1,39 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpidity; + + +/** + * ProtocolActions + * + * @author Rafael H. Schloming + */ + +interface ProtocolActions +{ + + void init(ProtocolHeader header); + + void frame(Frame frame); + + void error(ProtocolError error); + +} diff --git a/java/common/src/main/java/org/apache/qpidity/ProtocolError.java b/java/common/src/main/java/org/apache/qpidity/ProtocolError.java new file mode 100644 index 0000000000..a4a83fad35 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpidity/ProtocolError.java @@ -0,0 +1,47 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpidity; + + +/** + * ProtocolError + * + * @author Rafael H. Schloming + */ + +class ProtocolError +{ + + private final String format; + private final Object[] args; + + public ProtocolError(String format, Object ... args) + { + this.format = format; + this.args = args; + } + + public String getMessage() + { + return String.format(format, args); + } + +} diff --git a/java/common/src/main/java/org/apache/qpidity/ProtocolHandler.java b/java/common/src/main/java/org/apache/qpidity/ProtocolHandler.java new file mode 100644 index 0000000000..526ef50211 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpidity/ProtocolHandler.java @@ -0,0 +1,39 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpidity; + + +/** + * ProtocolHandler + * + * @author Rafael H. Schloming + */ + +interface ProtocolHandler +{ + + void init(ProtocolHeader header); + + void frame(Frame frame); + + void error(ProtocolError error); + +} diff --git a/java/common/src/main/java/org/apache/qpidity/ProtocolHeader.java b/java/common/src/main/java/org/apache/qpidity/ProtocolHeader.java new file mode 100644 index 0000000000..1f12e285e4 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpidity/ProtocolHeader.java @@ -0,0 +1,64 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpidity; + + +/** + * ProtocolHeader + * + * @author Rafael H. Schloming + */ + +class ProtocolHeader +{ + + final private byte instance; + final private byte major; + final private byte minor; + + public ProtocolHeader(byte instance, byte major, byte minor) + { + this.instance = instance; + this.major = major; + this.minor = minor; + } + + public byte getInstance() + { + return instance; + } + + public byte getMajor() + { + return major; + } + + public byte getMinor() + { + return minor; + } + + public String toString() + { + return String.format("AMQP.%d %d-%d", instance, major, minor); + } + +} diff --git a/java/common/src/main/java/org/apache/qpidity/Segment.java b/java/common/src/main/java/org/apache/qpidity/Segment.java index cd1d4ab05d..ee9f60fad8 100644 --- a/java/common/src/main/java/org/apache/qpidity/Segment.java +++ b/java/common/src/main/java/org/apache/qpidity/Segment.java @@ -20,10 +20,14 @@ */ package org.apache.qpidity; -import java.nio.ByteBuffer; - import java.util.ArrayList; import java.util.Collection; +import java.util.Iterator; + +import java.nio.ByteBuffer; + +import static org.apache.qpidity.Functions.*; + /** * Segment @@ -31,50 +35,40 @@ import java.util.Collection; * @author Rafael H. Schloming */ -class Segment +class Segment implements Iterable<ByteBuffer> { - private final Collection<Frame> frames = new ArrayList<Frame>(); + private final Collection<ByteBuffer> fragments = new ArrayList<ByteBuffer>(); - public void add(Frame frame) + public void add(ByteBuffer fragment) { - frames.add(frame); + fragments.add(fragment); } - public ByteBuffer getPayload() + public Iterator<ByteBuffer> getFragments() { - // we should probably use our own decoder interface here so - // that we can directly read from the incoming frame objects - // and automatically skip frame boundaries without copying - // everything in order to get a contiguous byte buffer - int capacity = 0; - for (Frame frame : frames) - { - capacity += frame.getSize(); - } - ByteBuffer buf = ByteBuffer.allocate(capacity); - for (Frame frame : frames) - { - buf.put(frame.getPayload()); - } - buf.flip(); - return buf; + return new SliceIterator(fragments.iterator()); + } + + public Iterator<ByteBuffer> iterator() + { + return getFragments(); } public String toString() { - StringBuffer buf = new StringBuffer(); - String sep = ",\n "; + StringBuilder str = new StringBuilder(); + String sep = " | "; - for (Frame f : frames) + for (ByteBuffer buf : this) { - buf.append(f.toString()); - buf.append(sep); + str.append(str(buf)); + str.append(sep); } - buf.setLength(buf.length() - sep.length()); + str.setLength(str.length() - sep.length()); - return buf.toString(); + return str.toString(); } } diff --git a/java/common/src/main/java/org/apache/qpidity/SegmentAssembler.java b/java/common/src/main/java/org/apache/qpidity/SegmentAssembler.java index 2d47e760c1..0fb90a2f53 100644 --- a/java/common/src/main/java/org/apache/qpidity/SegmentAssembler.java +++ b/java/common/src/main/java/org/apache/qpidity/SegmentAssembler.java @@ -22,6 +22,7 @@ package org.apache.qpidity; import java.nio.ByteBuffer; + /** * SegmentAssembler is a stateful handler that aggregates Frame events * into Segment events. This should only be used where it is necessary @@ -50,7 +51,10 @@ class SegmentAssembler<C> implements Handler<Event<C,Frame>> segment = new Segment(); } - segment.add(frame); + for (ByteBuffer fragment : frame) + { + segment.add(fragment); + } if (frame.isLastFrame()) { diff --git a/java/common/src/main/java/org/apache/qpidity/SegmentEncoder.java b/java/common/src/main/java/org/apache/qpidity/SegmentEncoder.java new file mode 100644 index 0000000000..7d4e5aabe4 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpidity/SegmentEncoder.java @@ -0,0 +1,173 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpidity; + +import java.nio.BufferOverflowException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; + +import static java.lang.Math.*; + +import static org.apache.qpidity.Frame.*; + + +/** + * SegmentEncoder + * + * @author Rafael H. Schloming + */ + +class SegmentEncoder extends AbstractEncoder +{ + + private final Handler<ByteBuffer> handler; + private final int max; + private final byte flags; + private final byte track; + private final byte type; + private final int channel; + + private int remaining; + private ByteBuffer frame; + private boolean first; + + public SegmentEncoder(Handler<ByteBuffer> handler, int max, + byte flags, + byte track, + byte type, + int channel, + int remaining) + { + if (max < HEADER_SIZE + 1) + { + throw new IllegalArgumentException + ("max frame size must be large enough to include header"); + } + + this.handler = handler; + this.max = max; + this.flags = flags; + this.track = track; + this.type = type; + this.channel = channel; + this.remaining = remaining; + this.frame = null; + this.first = true; + } + + private void preWrite() { + if (remaining == 0) + { + throw new BufferOverflowException(); + } + + if (frame == null) + { + frame = ByteBuffer.allocate(min(max, remaining + HEADER_SIZE)); + frame.order(ByteOrder.BIG_ENDIAN); + + byte frameFlags = flags; + if (first) { frameFlags |= FIRST_FRAME; first = false; } + if (remaining <= (frame.remaining() - HEADER_SIZE)) + { + frameFlags |= LAST_FRAME; + } + + frame.put(frameFlags); + frame.put(type); + frame.putShort((short) frame.limit()); + frame.put(RESERVED); + frame.put(track); + frame.putShort((short) channel); + frame.put(RESERVED); + frame.put(RESERVED); + frame.put(RESERVED); + frame.put(RESERVED); + + assert frame.position() == HEADER_SIZE; + } + } + + private void postWrite() { + if (!frame.hasRemaining()) + { + frame.flip(); + handler.handle(frame); + frame = null; + } + } + + @Override protected void put(byte b) + { + preWrite(); + frame.put(b); + remaining -= 1; + postWrite(); + } + + @Override protected void put(ByteBuffer src) + { + if (src.remaining() > remaining) + { + throw new BufferOverflowException(); + } + + while (src.hasRemaining()) + { + preWrite(); + int limit = src.limit(); + src.limit(src.position() + min(frame.remaining(), src.remaining())); + remaining -= src.remaining(); + frame.put(src); + src.limit(limit); + postWrite(); + } + } + + public static final void main(String[] args) { + ByteBuffer buf = ByteBuffer.allocate(1024); + buf.put("AMQP_PROTOCOL_HEADER".getBytes()); + buf.flip(); + + SegmentEncoder enc = new SegmentEncoder(new Handler<ByteBuffer>() + { + public void handle(ByteBuffer frame) + { + System.out.println(frame); + } + }, + 16, + (byte) 0x0, + (byte) Frame.L1, + (byte) Frame.METHOD, + 0, + 7 + buf.remaining()); + enc.put((byte)0); + enc.put((byte)1); + enc.put((byte)2); + enc.put((byte)3); + enc.put((byte)4); + enc.put((byte)5); + enc.put((byte)6); + enc.put(buf); + } + +} diff --git a/java/common/src/main/java/org/apache/qpidity/Session.java b/java/common/src/main/java/org/apache/qpidity/Session.java index 32da625070..7d74849b0b 100644 --- a/java/common/src/main/java/org/apache/qpidity/Session.java +++ b/java/common/src/main/java/org/apache/qpidity/Session.java @@ -29,7 +29,7 @@ import java.util.Map; * @author Rafael H. Schloming */ -public class Session extends Invoker implements DelegateResolver<Session> +public class Session extends Invoker { // channel may be null @@ -37,7 +37,6 @@ public class Session extends Invoker implements DelegateResolver<Session> private int command_id = 0; // XXX final Map<Integer,Handler<Struct>> handlers = new HashMap<Integer,Handler<Struct>>(); - final private Delegate<Session> delegate = new SessionDelegate(); public void attach(Channel channel) { @@ -62,9 +61,4 @@ public class Session extends Invoker implements DelegateResolver<Session> return channel.getFactory(); } - public Delegate<Session> resolve(Struct struct) - { - return delegate; - } - } diff --git a/java/common/src/main/java/org/apache/qpidity/SimpleDelegateResolver.java b/java/common/src/main/java/org/apache/qpidity/SimpleDelegateResolver.java new file mode 100644 index 0000000000..fe52f0c90b --- /dev/null +++ b/java/common/src/main/java/org/apache/qpidity/SimpleDelegateResolver.java @@ -0,0 +1,45 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpidity; + + +/** + * SimpleDelegateResolver + * + * @author Rafael H. Schloming + */ + +class SimpleDelegateResolver<C> implements DelegateResolver<C> +{ + + private final Delegate<C> delegate; + + public SimpleDelegateResolver(Delegate<C> delegate) + { + this.delegate = delegate; + } + + public Delegate<C> resolve(Struct struct) + { + return delegate; + } + +} diff --git a/java/common/src/main/java/org/apache/qpidity/SizeEncoder.java b/java/common/src/main/java/org/apache/qpidity/SizeEncoder.java index 745f58fd6d..b6ecdd2818 100644 --- a/java/common/src/main/java/org/apache/qpidity/SizeEncoder.java +++ b/java/common/src/main/java/org/apache/qpidity/SizeEncoder.java @@ -23,7 +23,7 @@ package org.apache.qpidity; import java.nio.ByteBuffer; import java.util.Map; -import java.util.UUID; + /** * SizeEncoder @@ -31,7 +31,7 @@ import java.util.UUID; * @author Rafael H. Schloming */ -class SizeEncoder implements Encoder +class SizeEncoder extends AbstractEncoder { private int size; @@ -52,39 +52,17 @@ class SizeEncoder implements Encoder this.size = size; } - public void writeBit(boolean b) - { - //throw new Error("TODO"); - } - - public void writeOctet(short b) + @Override protected void put(byte b) { size += 1; } - public void writeShort(int s) + @Override protected void put(ByteBuffer src) { - size += 2; + size += src.remaining(); } - public void writeLong(long i) - { - size += 4; - } - - public void writeLonglong(long l) - { - size += 8; - } - - - public void writeTimestamp(long l) - { - size += 8; - } - - - public void writeShortstr(String s) + @Override public void writeShortstr(String s) { if (s.length() > 255) { throw new IllegalArgumentException(s); @@ -93,30 +71,10 @@ class SizeEncoder implements Encoder size += s.length(); } - public void writeLongstr(String s) + @Override public void writeLongstr(String s) { - throw new Error("TODO"); - } - - - public void writeTable(Map<String,?> table) - { - //throw new Error("TODO"); - } - - public void writeRfc1982LongSet(Range<Long>[] ranges) - { - throw new Error("TODO"); - } - - public void writeUuid(UUID uuid) - { - throw new Error("TODO"); - } - - public void writeContent(String c) - { - throw new Error("TODO"); + writeLong(s.length()); + size += s.length(); } } diff --git a/java/common/src/main/java/org/apache/qpidity/SliceIterator.java b/java/common/src/main/java/org/apache/qpidity/SliceIterator.java new file mode 100644 index 0000000000..9b4a8f90f7 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpidity/SliceIterator.java @@ -0,0 +1,59 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpidity; + +import java.nio.ByteBuffer; + +import java.util.Iterator; + + +/** + * SliceIterator + * + * @author Rafael H. Schloming + */ + +class SliceIterator implements Iterator<ByteBuffer> +{ + + final private Iterator<ByteBuffer> iterator; + + public SliceIterator(Iterator<ByteBuffer> iterator) + { + this.iterator = iterator; + } + + public boolean hasNext() + { + return iterator.hasNext(); + } + + public ByteBuffer next() + { + return iterator.next().slice(); + } + + public void remove() + { + throw new UnsupportedOperationException(); + } + +} diff --git a/java/common/src/main/java/org/apache/qpidity/Stub.java b/java/common/src/main/java/org/apache/qpidity/Stub.java index be246d325b..e27fc89a59 100644 --- a/java/common/src/main/java/org/apache/qpidity/Stub.java +++ b/java/common/src/main/java/org/apache/qpidity/Stub.java @@ -1,32 +1,47 @@ package org.apache.qpidity; -import java.nio.ByteBuffer; import java.util.*; import java.lang.annotation.*; +import java.nio.ByteBuffer; + import static org.apache.qpidity.Option.*; + public class Stub { - private static Connection conn = new Connection(); + private static Connection conn = new Connection(new ConsoleOutput()); + + static + { + conn.init(new ProtocolHeader((byte) 1, (byte) 0, (byte) 10)); + } - private static void frame(short track, short type, boolean first, boolean last) { + private static void frame(byte track, byte type, boolean first, boolean last) { frame(track, type, first, last, null); } - private static void frame(short track, short type, boolean first, boolean last, Method m) { + private static void frame(byte track, byte type, boolean first, boolean last, Method m) { SizeEncoder sizer = new SizeEncoder(); if (m != null) { + sizer.writeLong(m.getEncodedType()); m.write(sizer); + sizer.flush(); } - ByteBuffer buf = ByteBuffer.allocate(sizer.getSize() + 4); + ByteBuffer buf = ByteBuffer.allocate(sizer.getSize()); if (m != null) { - buf.putInt(m.getEncodedType()); - m.write(new BBEncoder(buf)); + Encoder enc = new BBEncoder(buf); + enc.writeLong(m.getEncodedType()); + m.write(enc); + enc.flush(); } buf.flip(); - Frame frame = new Frame((short)0, track, type, true, true, first, last, buf); - conn.handle(frame); + byte flags = 0; + if (first) { flags |= Frame.FIRST_FRAME; } + if (last) { flags |= Frame.LAST_FRAME; } + Frame frame = new Frame(flags, type, track, 0); + frame.addFragment(buf); + conn.frame(frame); } public static final void main(String[] args) { @@ -59,7 +74,16 @@ class ChannelDelegate extends Delegate<Channel> { public @Override void sessionOpen(Channel channel, SessionOpen open) { Session ssn = new Session(); ssn.attach(channel); - System.out.println("Session Open"); + long lifetime = open.getDetachedlifetime(); + System.out.println("Session Opened lifetime = " + lifetime); + try + { + ssn.sessionAttached(UUID.randomUUID(), lifetime); + } + catch (QpidException e) + { + throw new RuntimeException(e); + } } } @@ -74,11 +98,11 @@ class SessionDelegate extends Delegate<Session> { System.out.println("got an exchange declare: " + ed.getExchange() + ", " + ed.getType()); try { - session.queueDeclare("asdf", "alternate", null); + session.queueDeclare("asdf", "alternate", null); } - catch(Exception e) + catch(QpidException e) { - + throw new RuntimeException(e); } } diff --git a/java/common/src/main/java/org/apache/qpidity/Switch.java b/java/common/src/main/java/org/apache/qpidity/Switch.java index a84ae1507b..166dc33134 100644 --- a/java/common/src/main/java/org/apache/qpidity/Switch.java +++ b/java/common/src/main/java/org/apache/qpidity/Switch.java @@ -44,6 +44,12 @@ abstract class Switch<K,E> implements Handler<E> { K key = resolve(event); Handler<E> handler = handlers.get(key); + if (handler == null) + { + throw new IllegalStateException("no such key: " + key + + " this = " + this + + " handlers = " + handlers); + } handler.handle(event); } diff --git a/java/common/src/main/java/org/apache/qpidity/TrackSwitch.java b/java/common/src/main/java/org/apache/qpidity/TrackSwitch.java index c9b3cda032..28a7d75f05 100644 --- a/java/common/src/main/java/org/apache/qpidity/TrackSwitch.java +++ b/java/common/src/main/java/org/apache/qpidity/TrackSwitch.java @@ -28,10 +28,10 @@ package org.apache.qpidity; * @author Rafael H. Schloming */ -class TrackSwitch<C> extends Switch<Short,Event<C,Frame>> +class TrackSwitch<C> extends Switch<Byte,Event<C,Frame>> { - public Short resolve(Event<C,Frame> event) + public Byte resolve(Event<C,Frame> event) { return event.target.getTrack(); } diff --git a/java/common/src/main/java/org/apache/qpidity/TypeSwitch.java b/java/common/src/main/java/org/apache/qpidity/TypeSwitch.java index a5073486c7..fc53b0b9b4 100644 --- a/java/common/src/main/java/org/apache/qpidity/TypeSwitch.java +++ b/java/common/src/main/java/org/apache/qpidity/TypeSwitch.java @@ -28,10 +28,10 @@ package org.apache.qpidity; * @author Rafael H. Schloming */ -class TypeSwitch<C> extends Switch<Short,Event<C,Frame>> +class TypeSwitch<C> extends Switch<Byte,Event<C,Frame>> { - public Short resolve(Event<C,Frame> event) + public Byte resolve(Event<C,Frame> event) { return event.target.getType(); } |
