diff options
| author | Arnaud Simon <arnaudsimon@apache.org> | 2008-05-05 08:35:56 +0000 |
|---|---|---|
| committer | Arnaud Simon <arnaudsimon@apache.org> | 2008-05-05 08:35:56 +0000 |
| commit | ec3f643e64f294cbe82464b59bdad35f9a5a4172 (patch) | |
| tree | 83eb2d4e76035e10a4ccc1781b0e5610c971e5fc /qpid/java/common/src | |
| parent | c2b3043b119707511cee1b1952a8aa4e097badc6 (diff) | |
| download | qpid-python-ec3f643e64f294cbe82464b59bdad35f9a5a4172.tar.gz | |
QPID-1006 and QPID-1007: -QPID-1006:use same socket buffer size and frame size -QPID-1007: added io write handler into MINA chain
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@653354 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/common/src')
| -rw-r--r-- | qpid/java/common/src/main/java/org/apache/qpidity/transport/network/mina/MinaHandler.java | 68 |
1 files changed, 64 insertions, 4 deletions
diff --git a/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/mina/MinaHandler.java b/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/mina/MinaHandler.java index 7da719087c..9e890feec6 100644 --- a/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/mina/MinaHandler.java +++ b/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/mina/MinaHandler.java @@ -27,7 +27,14 @@ import java.net.SocketAddress; import org.apache.mina.common.*; import org.apache.mina.transport.socket.nio.SocketAcceptor; +import org.apache.mina.transport.socket.nio.MemoryHandlerSocketConnector; +import org.apache.mina.transport.socket.nio.SocketSessionConfig; import org.apache.mina.transport.socket.nio.SocketConnector; +import org.apache.mina.filter.ReadThrottleFilterBuilder; +import org.apache.mina.filter.WriteBufferLimitFilterBuilder; +import org.apache.mina.filter.SSLFilter; +import org.apache.mina.filter.executor.ExecutorFilter; +import org.apache.mina.filter.codec.ProtocolCodecFilter; import org.apache.qpidity.transport.Binding; import org.apache.qpidity.transport.Connection; @@ -41,6 +48,12 @@ import org.apache.qpidity.transport.network.Assembler; import org.apache.qpidity.transport.network.Disassembler; import org.apache.qpidity.transport.network.InputHandler; import org.apache.qpidity.transport.network.OutputHandler; +import org.apache.qpid.client.failover.FailoverHandler; +import org.apache.qpid.client.SSLConfiguration; +import org.apache.qpid.client.protocol.AMQProtocolSession; +import org.apache.qpid.codec.AMQCodecFactory; +import org.apache.qpid.ssl.SSLContextFactory; +import org.apache.qpid.pool.ReadWriteThreadModel; /** @@ -51,6 +64,11 @@ import org.apache.qpidity.transport.network.OutputHandler; //RA making this public until we sort out the package issues public class MinaHandler<E> implements IoHandler { + private static final int DEFAULT_BUFFER_SIZE = 64 * 1024; + /** Default buffer size for pending messages reads */ + private static final String DEFAULT_READ_BUFFER_LIMIT = "262144"; + /** Default buffer size for pending messages writes */ + private static final String DEFAULT_WRITE_BUFFER_LIMIT = "262144"; private static final Logger log = Logger.get(MinaHandler.class); @@ -85,9 +103,45 @@ public class MinaHandler<E> implements IoHandler attachment.receiver.exception(e); } - public void sessionCreated(final IoSession ssn) + /** + * Invoked by MINA when a MINA session for a new connection is created. This method sets up the filter chain on the + * session, which filters the events handled by this handler. The filter chain consists of, handing off events + * to an optional protectio + * + * @param session The MINA session. + * @throws Exception Any underlying exceptions are allowed to fall through to MINA. + */ + public void sessionCreated(IoSession session) throws Exception { - // do nothing + log.debug("Protocol session created for session " + System.identityHashCode(session)); + + if (Boolean.getBoolean("protectio")) + { + try + { + //Add IO Protection Filters + IoFilterChain chain = session.getFilterChain(); + + session.getFilterChain().addLast("tempExecutorFilterForFilterBuilder", new ExecutorFilter()); + + ReadThrottleFilterBuilder readfilter = new ReadThrottleFilterBuilder(); + readfilter.setMaximumConnectionBufferSize( + Integer.parseInt(System.getProperty("qpid.read.buffer.limit", DEFAULT_READ_BUFFER_LIMIT))); + readfilter.attach(chain); + + WriteBufferLimitFilterBuilder writefilter = new WriteBufferLimitFilterBuilder(); + writefilter.setMaximumConnectionBufferSize( + Integer.parseInt(System.getProperty("qpid.write.buffer.limit", DEFAULT_WRITE_BUFFER_LIMIT))); + writefilter.attach(chain); + session.getFilterChain().remove("tempExecutorFilterForFilterBuilder"); + + log.info("Using IO Read/Write Filter Protection"); + } + catch (Exception e) + { + log.error("Unable to attach IO Read/Write Filter Protection :" + e.getMessage()); + } + } } public void sessionOpened(final IoSession ssn) @@ -147,7 +201,7 @@ public class MinaHandler<E> implements IoHandler IoAcceptor acceptor = new SocketAcceptor(); acceptor.bind(address, new MinaHandler<E>(binding)); } - + public static final <E> E connect(String host, int port, Binding<E,java.nio.ByteBuffer> binding) { @@ -161,6 +215,11 @@ public class MinaHandler<E> implements IoHandler SocketConnector connector = new SocketConnector(); IoServiceConfig acceptorConfig = connector.getDefaultConfig(); acceptorConfig.setThreadModel(ThreadModel.MANUAL); + SocketSessionConfig scfg = (SocketSessionConfig) acceptorConfig.getSessionConfig(); + scfg.setTcpNoDelay("true".equalsIgnoreCase(System.getProperty("amqj.tcpNoDelay", "true"))); + scfg.setSendBufferSize(Integer.getInteger("amqj.sendBufferSize", DEFAULT_BUFFER_SIZE)); + scfg.setReceiveBufferSize(Integer.getInteger("amqj.receiveBufferSize", DEFAULT_BUFFER_SIZE)); + connector.setWorkerTimeout(0); ConnectFuture cf = connector.connect(address, handler); cf.join(); @@ -221,7 +280,8 @@ public class MinaHandler<E> implements IoHandler { // XXX: hardcoded max-frame return new Connection - (new Disassembler(new OutputHandler(sender), 64*1024 - 1), + (new Disassembler(new OutputHandler(sender), + Math.min(DEFAULT_BUFFER_SIZE, Integer.getInteger("amqj.sendBufferSize", DEFAULT_BUFFER_SIZE)) - 1), delegate); } |
