diff options
| author | Aidan Skinner <aidan@apache.org> | 2008-08-08 17:08:37 +0000 |
|---|---|---|
| committer | Aidan Skinner <aidan@apache.org> | 2008-08-08 17:08:37 +0000 |
| commit | 8ecc4ecc20d23685c8e2a835b855bfb60fdf5bf7 (patch) | |
| tree | a30d0180600d2570d3bb02e3d16393de28f9f5ca /qpid/java/common/src | |
| parent | bd6c228a52e5085ac4e01e56ddcc0eb9d5fd5218 (diff) | |
| download | qpid-python-8ecc4ecc20d23685c8e2a835b855bfb60fdf5bf7.tar.gz | |
QPID-1218 Optionally use IoTransport, it's hot, but doesn't pass all the tests yet.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@684016 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/common/src')
5 files changed, 155 insertions, 9 deletions
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlock.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlock.java index 903b5bfa7a..a2fc3a03ef 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlock.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlock.java @@ -50,4 +50,14 @@ public abstract class AMQDataBlock implements EncodableAMQDataBlock return buffer; } + public java.nio.ByteBuffer toNioByteBuffer() + { + final java.nio.ByteBuffer buffer = java.nio.ByteBuffer.allocate((int) getSize()); + + ByteBuffer buf = ByteBuffer.wrap(buffer); + writePayload(buf); + buffer.flip(); + return buffer; + } + } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java b/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java index 59003225b7..44cc9586a9 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java @@ -21,6 +21,7 @@ package org.apache.qpid.protocol; import org.apache.qpid.framing.*; +import org.apache.qpid.transport.network.io.IoSender; import org.apache.qpid.AMQException; /** @@ -54,4 +55,7 @@ public interface AMQVersionAwareProtocolSession extends AMQProtocolWriter, Proto public void heartbeatBodyReceived(int channelId, HeartbeatBody body) throws AMQException; + public void setSender(IoSender sender); + public void init(); + } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/InputHandler_0_9.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/InputHandler_0_9.java new file mode 100644 index 0000000000..b63020913b --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/InputHandler_0_9.java @@ -0,0 +1,109 @@ +package org.apache.qpid.transport.network.io; + +import java.nio.ByteBuffer; + +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQFrame; +import org.apache.qpid.framing.AMQFrameDecodingException; +import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.framing.AMQMethodBodyFactory; +import org.apache.qpid.framing.BodyFactory; +import org.apache.qpid.framing.ContentBody; +import org.apache.qpid.framing.ContentBodyFactory; +import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.framing.ContentHeaderBodyFactory; +import org.apache.qpid.framing.HeartbeatBody; +import org.apache.qpid.framing.HeartbeatBodyFactory; +import org.apache.qpid.framing.MethodRegistry; +import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; +import org.apache.qpid.transport.Receiver; + +public class InputHandler_0_9 implements Receiver<ByteBuffer> +{ + + private AMQVersionAwareProtocolSession _session; + private MethodRegistry _registry; + private BodyFactory bodyFactory; + private static final BodyFactory[] _bodiesSupported = new BodyFactory[Byte.MAX_VALUE]; + + static + { + _bodiesSupported[ContentHeaderBody.TYPE] = ContentHeaderBodyFactory.getInstance(); + _bodiesSupported[ContentBody.TYPE] = ContentBodyFactory.getInstance(); + _bodiesSupported[HeartbeatBody.TYPE] = new HeartbeatBodyFactory(); + } + + public InputHandler_0_9(AMQVersionAwareProtocolSession session) + { + _session = session; + _registry = _session.getMethodRegistry(); + } + + public void closed() + { + // AS FIXME: implement + } + + public void exception(Throwable t) + { + // TODO: propogate exception to things + t.printStackTrace(); + } + + public void received(ByteBuffer buf) + { + org.apache.mina.common.ByteBuffer in = org.apache.mina.common.ByteBuffer.wrap(buf); + try + { + final byte type = in.get(); + if (type == AMQMethodBody.TYPE) + { + bodyFactory = new AMQMethodBodyFactory(_session); + } + else + { + bodyFactory = _bodiesSupported[type]; + } + + if (bodyFactory == null) + { + throw new AMQFrameDecodingException(null, "Unsupported frame type: " + type, null); + } + + final int channel = in.getUnsignedShort(); + final long bodySize = in.getUnsignedInt(); + + // bodySize can be zero + if ((channel < 0) || (bodySize < 0)) + { + throw new AMQFrameDecodingException(null, "Undecodable frame: type = " + type + " channel = " + channel + + " bodySize = " + bodySize, null); + } + + AMQFrame frame = new AMQFrame(in, channel, bodySize, bodyFactory); + + byte marker = in.get(); + if ((marker & 0xFF) != 0xCE) + { + throw new AMQFrameDecodingException(null, "End of frame marker not found. Read " + marker + " length=" + bodySize + + " type=" + type, null); + } + + try + { + frame.getBodyFrame().handle(frame.getChannel(), _session); + } + catch (AMQException e) + { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } + catch (AMQFrameDecodingException e) + { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } + +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java index 23f48a06de..ef892744ab 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java @@ -31,7 +31,7 @@ import org.apache.qpid.transport.util.Logger; import static org.apache.qpid.transport.util.Functions.*; -final class IoSender extends Thread implements Sender<ByteBuffer> +public final class IoSender extends Thread implements Sender<ByteBuffer> { private static final Logger log = Logger.get(IoSender.class); diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java index 3b543b3e60..7a17ef6b73 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java @@ -26,6 +26,7 @@ import java.net.Socket; import java.net.SocketException; import java.nio.ByteBuffer; +import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; import org.apache.qpid.transport.Connection; import org.apache.qpid.transport.ConnectionDelegate; import org.apache.qpid.transport.Receiver; @@ -82,6 +83,19 @@ public final class IoTransport private Connection connectInternal(String host, int port, ConnectionDelegate delegate) { + createSocket(host, port); + + sender = new IoSender(this, 2*writeBufferSize, timeout); + Connection conn = new Connection + (new Disassembler(sender, 64*1024 - 1), delegate); + receiver = new IoReceiver(this, new InputHandler(new Assembler(conn)), + 2*readBufferSize, timeout); + + return conn; + } + + private void createSocket(String host, int port) + { try { InetAddress address = InetAddress.getByName(host); @@ -108,14 +122,6 @@ public final class IoTransport { throw new TransportException("Error connecting to broker", e); } - - sender = new IoSender(this, 2*writeBufferSize, timeout); - Connection conn = new Connection - (new Disassembler(sender, 64*1024 - 1), delegate); - receiver = new IoReceiver(this, new InputHandler(new Assembler(conn)), - 2*readBufferSize, timeout); - - return conn; } IoSender getSender() @@ -133,4 +139,21 @@ public final class IoTransport return socket; } + public static void connect_0_9 (AMQVersionAwareProtocolSession session, String host, int port) + { + IoTransport handler = new IoTransport(); + handler.connectInternal_0_9(session, host, port); + } + + public void connectInternal_0_9(AMQVersionAwareProtocolSession session, String host, int port) + { + + createSocket(host, port); + + sender = new IoSender(this, 2*writeBufferSize, timeout); + receiver = new IoReceiver(this, new InputHandler_0_9(session), + 2*readBufferSize, timeout); + session.setSender(sender); + } + } |
