diff options
| author | Rajith Muditha Attapattu <rajith@apache.org> | 2007-11-30 16:16:14 +0000 |
|---|---|---|
| committer | Rajith Muditha Attapattu <rajith@apache.org> | 2007-11-30 16:16:14 +0000 |
| commit | 20eaf3b517d109f108f58909fb629d81b0df6bc4 (patch) | |
| tree | a38275d7a70795e34d6361e43ac87aa7f173e491 /qpid/java | |
| parent | 30c528b1ace05766ac1854dd2aef569490acb36f (diff) | |
| download | qpid-python-20eaf3b517d109f108f58909fb629d81b0df6bc4.tar.gz | |
Added a very basic plain nio transport. You could flip between the transports using -Dtransport="nio". By default it's the MINA transport.
You can also turn on batching for the nio transport by using -Dbatch="true". This option has no effect on MINA.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@599856 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
6 files changed, 246 insertions, 5 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java index 32665c2a24..257d96bbe2 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java @@ -103,7 +103,7 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer origMessage.setJMSExpiration(message.get010Message().getDeliveryProperties().getExpiration()); origMessage.setJMSMessageID(message.getJMSMessageID()); origMessage.setJMSDeliveryMode(deliveryMode); - + BasicContentHeaderProperties contentHeaderProperties = message.getContentHeaderProperties(); if (contentHeaderProperties.reset()) { @@ -159,6 +159,7 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer catch (RuntimeException rte) { JMSException ex = new JMSException("Exception when sending message"); + rte.printStackTrace(); ex.setLinkedException(rte); throw ex; } diff --git a/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Client.java b/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Client.java index a5483a9c52..f6e7911078 100644 --- a/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Client.java +++ b/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Client.java @@ -19,6 +19,7 @@ import org.apache.qpidity.transport.ConnectionEvent; import org.apache.qpidity.transport.ProtocolHeader; import org.apache.qpidity.transport.SessionDelegate; import org.apache.qpidity.transport.network.mina.MinaHandler; +import org.apache.qpidity.transport.network.nio.NioHandler; public class Client implements org.apache.qpidity.nclient.Connection @@ -72,7 +73,16 @@ public class Client implements org.apache.qpidity.nclient.Connection connectionDelegate.setPassword(password); connectionDelegate.setVirtualHost(virtualHost); - _conn = MinaHandler.connect(host, port,connectionDelegate); + if (System.getProperty("transport","mina").equalsIgnoreCase("nio")) + { + System.out.println("using NIO"); + _conn = NioHandler.connect(host, port,connectionDelegate); + } + else + { + System.out.println("using MINA"); + _conn = MinaHandler.connect(host, port,connectionDelegate); + } // XXX: hardcoded version numbers _conn.send(new ConnectionEvent(0, new ProtocolHeader(1, 0, 10))); @@ -119,6 +129,11 @@ public class Client implements org.apache.qpidity.nclient.Connection ClientSession ssn = new ClientSession(); ssn.attach(ch); ssn.sessionOpen(expiryInSeconds); + if (Boolean.getBoolean("batch") && System.getProperty("transport").equalsIgnoreCase("nio")) + { + System.out.println("using batching"); + NioHandler.startBatchingFrames(_conn.getConnectionId()); + } return ssn; } diff --git a/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java b/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java index 4c5993455c..34f902061e 100644 --- a/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java @@ -21,7 +21,7 @@ public class ClientSession extends org.apache.qpidity.transport.Session implemen { static { - String max = "message_size_before_sync"; + String max = "message_size_before_sync"; // KB's try { MAX_NOT_SYNC_DATA_LENGH = new Long(System.getProperties().getProperty(max, "200000000")); @@ -132,7 +132,7 @@ public class ClientSession extends org.apache.qpidity.transport.Session implemen public void endData() { super.endData(); - if( MAX_NOT_SYNC_DATA_LENGH != -1 && _currentDataSizeNotSynced >= MAX_NOT_SYNC_DATA_LENGH) + /* if( MAX_NOT_SYNC_DATA_LENGH != -1 && _currentDataSizeNotSynced >= MAX_NOT_SYNC_DATA_LENGH) { sync(); } @@ -140,7 +140,7 @@ public class ClientSession extends org.apache.qpidity.transport.Session implemen { executionFlush(); _currentDataSizeNotFlushed = 0; - } + }*/ } public RangeSet getAccquiredMessages() diff --git a/qpid/java/common/src/main/java/org/apache/qpidity/transport/Connection.java b/qpid/java/common/src/main/java/org/apache/qpidity/transport/Connection.java index c9299111a1..7aee5a11ae 100644 --- a/qpid/java/common/src/main/java/org/apache/qpidity/transport/Connection.java +++ b/qpid/java/common/src/main/java/org/apache/qpidity/transport/Connection.java @@ -48,6 +48,8 @@ public class Connection final private Sender<ConnectionEvent> sender; final private ConnectionDelegate delegate; + // want to make this final + private int _connectionId; final private Map<Integer,Channel> channels = new HashMap<Integer,Channel>(); @@ -58,6 +60,16 @@ public class Connection this.delegate = delegate; } + public void setConnectionId(int id) + { + _connectionId = id; + } + + public int getConnectionId() + { + return _connectionId; + } + public ConnectionDelegate getConnectionDelegate() { return delegate; diff --git a/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/nio/NioHandler.java b/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/nio/NioHandler.java new file mode 100644 index 0000000000..c5ba3b564d --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/nio/NioHandler.java @@ -0,0 +1,118 @@ +package org.apache.qpidity.transport.network.nio; + +import java.io.EOFException; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.net.SocketException; +import java.nio.ByteBuffer; +import java.nio.channels.SocketChannel; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.qpidity.transport.Connection; +import org.apache.qpidity.transport.ConnectionDelegate; +import org.apache.qpidity.transport.Receiver; +import org.apache.qpidity.transport.network.Assembler; +import org.apache.qpidity.transport.network.Disassembler; +import org.apache.qpidity.transport.network.InputHandler; +import org.apache.qpidity.transport.network.OutputHandler; + +public class NioHandler implements Runnable +{ + private Receiver<ByteBuffer> _receiver; + private SocketChannel _ch; + private ByteBuffer _readBuf; + private static Map<Integer,NioSender> _handlers = new ConcurrentHashMap<Integer,NioSender>(); + private AtomicInteger _count = new AtomicInteger(); + + private NioHandler(){} + + public static final Connection connect(String host, int port, + ConnectionDelegate delegate) + { + NioHandler handler = new NioHandler(); + return handler.connectInternal(host,port,delegate); + } + + private Connection connectInternal(String host, int port, + ConnectionDelegate delegate) + { + try + { + SocketAddress address = new InetSocketAddress(host,port); + _ch = SocketChannel.open(); + _ch.socket().setReuseAddress(true); + _ch.configureBlocking(true); + _ch.socket().setTcpNoDelay(true); + if (address != null) + { + _ch.socket().connect(address); + } + while (_ch.isConnectionPending()) + { + + } + + } + catch (SocketException e) + { + + e.printStackTrace(); + } + catch (IOException e) + { + e.printStackTrace(); + } + + NioSender sender = new NioSender(_ch); + Connection con = new Connection + (new Disassembler(new OutputHandler(sender), 64*1024 - 1), + delegate); + + con.setConnectionId(_count.incrementAndGet()); + _handlers.put(con.getConnectionId(),sender); + + _receiver = new InputHandler(new Assembler(con), InputHandler.State.FRAME_HDR); + + Thread t = new Thread(this); + t.start(); + + return con; + } + + public void run() + { + _readBuf = ByteBuffer.allocate(1024); + long read = 0; + while(_ch.isConnected() && _ch.isOpen()) + { + try + { + read = _ch.read(_readBuf); + if (read > 0) + { + ByteBuffer b = _readBuf; + b.flip(); + _receiver.received(b); + _readBuf.clear(); + } + } + catch(Exception e) + { + e.printStackTrace(); + } + } + + //throw new EOFException("The underlying socket/channel has closed"); + } + + public static void startBatchingFrames(int connectionId) + { + NioSender sender = _handlers.get(connectionId); + sender.setStartBatching(); + } + + +} diff --git a/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/nio/NioSender.java b/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/nio/NioSender.java new file mode 100644 index 0000000000..2cfe6c2089 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/nio/NioSender.java @@ -0,0 +1,95 @@ +package org.apache.qpidity.transport.network.nio; + +import java.nio.ByteBuffer; +import java.nio.channels.SocketChannel; + +import org.apache.qpidity.transport.Sender; + +public class NioSender implements Sender<java.nio.ByteBuffer> +{ + private final Object lock = new Object(); + private SocketChannel _ch; + private boolean _batch = false; + private ByteBuffer _batcher; + + public NioSender(SocketChannel ch) + { + this._ch = ch; + } + + public void send(java.nio.ByteBuffer buf) + { + if (_batch) + { + //System.out.println(_batcher.position() + " , " + buf.remaining() + " , " + buf.position() + ","+_batcher.capacity()); + if (_batcher.position() + buf.remaining() >= _batcher.capacity()) + { + _batcher.flip(); + write(_batcher); + _batcher.clear(); + if (buf.remaining() > _batcher.capacity()) + { + write(buf); + } + else + { + _batcher.put(buf); + } + } + else + { + _batcher.put(buf); + } + } + else + { + write(buf); + } + } + + private void write(java.nio.ByteBuffer buf) + { + synchronized (lock) + { + if( _ch.isConnected() && _ch.isOpen()) + { + try + { + _ch.write(buf); + } + catch(Exception e) + { + e.fillInStackTrace(); + } + } + else + { + throw new RuntimeException("Trying to write on a closed socket"); + } + + } + } + + public void setStartBatching() + { + _batch = true; + _batcher = ByteBuffer.allocate(1024); + } + + public void close() + { + // MINA will sometimes throw away in-progress writes when you + // ask it to close + synchronized (lock) + { + try + { + _ch.close(); + } + catch(Exception e) + { + e.printStackTrace(); + } + } + } +} |
