summaryrefslogtreecommitdiff
path: root/qpid/java/common/src
diff options
context:
space:
mode:
authorAidan Skinner <aidan@apache.org>2008-08-08 17:08:37 +0000
committerAidan Skinner <aidan@apache.org>2008-08-08 17:08:37 +0000
commit8ecc4ecc20d23685c8e2a835b855bfb60fdf5bf7 (patch)
treea30d0180600d2570d3bb02e3d16393de28f9f5ca /qpid/java/common/src
parentbd6c228a52e5085ac4e01e56ddcc0eb9d5fd5218 (diff)
downloadqpid-python-8ecc4ecc20d23685c8e2a835b855bfb60fdf5bf7.tar.gz
QPID-1218 Optionally use IoTransport, it's hot, but doesn't pass all the tests yet.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@684016 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/common/src')
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlock.java10
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java4
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/InputHandler_0_9.java109
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java2
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java39
5 files changed, 155 insertions, 9 deletions
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlock.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlock.java
index 903b5bfa7a..a2fc3a03ef 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlock.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlock.java
@@ -50,4 +50,14 @@ public abstract class AMQDataBlock implements EncodableAMQDataBlock
return buffer;
}
+ public java.nio.ByteBuffer toNioByteBuffer()
+ {
+ final java.nio.ByteBuffer buffer = java.nio.ByteBuffer.allocate((int) getSize());
+
+ ByteBuffer buf = ByteBuffer.wrap(buffer);
+ writePayload(buf);
+ buffer.flip();
+ return buffer;
+ }
+
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java b/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java
index 59003225b7..44cc9586a9 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java
@@ -21,6 +21,7 @@
package org.apache.qpid.protocol;
import org.apache.qpid.framing.*;
+import org.apache.qpid.transport.network.io.IoSender;
import org.apache.qpid.AMQException;
/**
@@ -54,4 +55,7 @@ public interface AMQVersionAwareProtocolSession extends AMQProtocolWriter, Proto
public void heartbeatBodyReceived(int channelId, HeartbeatBody body) throws AMQException;
+ public void setSender(IoSender sender);
+ public void init();
+
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/InputHandler_0_9.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/InputHandler_0_9.java
new file mode 100644
index 0000000000..b63020913b
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/InputHandler_0_9.java
@@ -0,0 +1,109 @@
+package org.apache.qpid.transport.network.io;
+
+import java.nio.ByteBuffer;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.AMQFrameDecodingException;
+import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.framing.AMQMethodBodyFactory;
+import org.apache.qpid.framing.BodyFactory;
+import org.apache.qpid.framing.ContentBody;
+import org.apache.qpid.framing.ContentBodyFactory;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.ContentHeaderBodyFactory;
+import org.apache.qpid.framing.HeartbeatBody;
+import org.apache.qpid.framing.HeartbeatBodyFactory;
+import org.apache.qpid.framing.MethodRegistry;
+import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
+import org.apache.qpid.transport.Receiver;
+
+public class InputHandler_0_9 implements Receiver<ByteBuffer>
+{
+
+ private AMQVersionAwareProtocolSession _session;
+ private MethodRegistry _registry;
+ private BodyFactory bodyFactory;
+ private static final BodyFactory[] _bodiesSupported = new BodyFactory[Byte.MAX_VALUE];
+
+ static
+ {
+ _bodiesSupported[ContentHeaderBody.TYPE] = ContentHeaderBodyFactory.getInstance();
+ _bodiesSupported[ContentBody.TYPE] = ContentBodyFactory.getInstance();
+ _bodiesSupported[HeartbeatBody.TYPE] = new HeartbeatBodyFactory();
+ }
+
+ public InputHandler_0_9(AMQVersionAwareProtocolSession session)
+ {
+ _session = session;
+ _registry = _session.getMethodRegistry();
+ }
+
+ public void closed()
+ {
+ // AS FIXME: implement
+ }
+
+ public void exception(Throwable t)
+ {
+ // TODO: propogate exception to things
+ t.printStackTrace();
+ }
+
+ public void received(ByteBuffer buf)
+ {
+ org.apache.mina.common.ByteBuffer in = org.apache.mina.common.ByteBuffer.wrap(buf);
+ try
+ {
+ final byte type = in.get();
+ if (type == AMQMethodBody.TYPE)
+ {
+ bodyFactory = new AMQMethodBodyFactory(_session);
+ }
+ else
+ {
+ bodyFactory = _bodiesSupported[type];
+ }
+
+ if (bodyFactory == null)
+ {
+ throw new AMQFrameDecodingException(null, "Unsupported frame type: " + type, null);
+ }
+
+ final int channel = in.getUnsignedShort();
+ final long bodySize = in.getUnsignedInt();
+
+ // bodySize can be zero
+ if ((channel < 0) || (bodySize < 0))
+ {
+ throw new AMQFrameDecodingException(null, "Undecodable frame: type = " + type + " channel = " + channel
+ + " bodySize = " + bodySize, null);
+ }
+
+ AMQFrame frame = new AMQFrame(in, channel, bodySize, bodyFactory);
+
+ byte marker = in.get();
+ if ((marker & 0xFF) != 0xCE)
+ {
+ throw new AMQFrameDecodingException(null, "End of frame marker not found. Read " + marker + " length=" + bodySize
+ + " type=" + type, null);
+ }
+
+ try
+ {
+ frame.getBodyFrame().handle(frame.getChannel(), _session);
+ }
+ catch (AMQException e)
+ {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+ catch (AMQFrameDecodingException e)
+ {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java
index 23f48a06de..ef892744ab 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java
@@ -31,7 +31,7 @@ import org.apache.qpid.transport.util.Logger;
import static org.apache.qpid.transport.util.Functions.*;
-final class IoSender extends Thread implements Sender<ByteBuffer>
+public final class IoSender extends Thread implements Sender<ByteBuffer>
{
private static final Logger log = Logger.get(IoSender.class);
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java
index 3b543b3e60..7a17ef6b73 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java
@@ -26,6 +26,7 @@ import java.net.Socket;
import java.net.SocketException;
import java.nio.ByteBuffer;
+import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
import org.apache.qpid.transport.Connection;
import org.apache.qpid.transport.ConnectionDelegate;
import org.apache.qpid.transport.Receiver;
@@ -82,6 +83,19 @@ public final class IoTransport
private Connection connectInternal(String host, int port,
ConnectionDelegate delegate)
{
+ createSocket(host, port);
+
+ sender = new IoSender(this, 2*writeBufferSize, timeout);
+ Connection conn = new Connection
+ (new Disassembler(sender, 64*1024 - 1), delegate);
+ receiver = new IoReceiver(this, new InputHandler(new Assembler(conn)),
+ 2*readBufferSize, timeout);
+
+ return conn;
+ }
+
+ private void createSocket(String host, int port)
+ {
try
{
InetAddress address = InetAddress.getByName(host);
@@ -108,14 +122,6 @@ public final class IoTransport
{
throw new TransportException("Error connecting to broker", e);
}
-
- sender = new IoSender(this, 2*writeBufferSize, timeout);
- Connection conn = new Connection
- (new Disassembler(sender, 64*1024 - 1), delegate);
- receiver = new IoReceiver(this, new InputHandler(new Assembler(conn)),
- 2*readBufferSize, timeout);
-
- return conn;
}
IoSender getSender()
@@ -133,4 +139,21 @@ public final class IoTransport
return socket;
}
+ public static void connect_0_9 (AMQVersionAwareProtocolSession session, String host, int port)
+ {
+ IoTransport handler = new IoTransport();
+ handler.connectInternal_0_9(session, host, port);
+ }
+
+ public void connectInternal_0_9(AMQVersionAwareProtocolSession session, String host, int port)
+ {
+
+ createSocket(host, port);
+
+ sender = new IoSender(this, 2*writeBufferSize, timeout);
+ receiver = new IoReceiver(this, new InputHandler_0_9(session),
+ 2*readBufferSize, timeout);
+ session.setSender(sender);
+ }
+
}