From a62268645e691b71a645ea19ca41e93df6c7cff9 Mon Sep 17 00:00:00 2001 From: Martin Ritchie Date: Mon, 26 Nov 2007 15:57:46 +0000 Subject: QPID-92, QPID-564 : Upgraded Mina to 1.0.1 still not good enough but all future versions currently have a bug with the CumulativeProtocolDecoder. It compact()s the buffer which breaks slices. Added MultiThread Support which is some of the feature set of QPID-564 git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1@598324 13f79535-47bb-0310-9956-ffa450edef68 --- .../qpid/client/protocol/AMQProtocolHandler.java | 36 +++++++++- .../qpid/client/transport/TransportConnection.java | 77 +++++++++++----------- .../transport/VmPipeTransportConnection.java | 2 +- .../java/org/apache/qpid/jms/BrokerDetails.java | 9 +-- .../jms/failover/FailoverRoundRobinServers.java | 43 ++++++------ .../qpid/jms/failover/FailoverSingleServer.java | 21 +++++- 6 files changed, 123 insertions(+), 65 deletions(-) (limited to 'java/client/src') diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java index 5ee3fa5407..8a1e78d2e0 100644 --- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java @@ -21,12 +21,16 @@ package org.apache.qpid.client.protocol; import org.apache.mina.common.IdleStatus; +import org.apache.mina.common.IoFilterChain; import org.apache.mina.common.IoHandlerAdapter; import org.apache.mina.common.IoSession; +import org.apache.mina.filter.ReadThrottleFilterBuilder; import org.apache.mina.filter.SSLFilter; +import org.apache.mina.filter.WriteBufferLimitFilterBuilder; import org.apache.mina.filter.codec.ProtocolCodecException; import org.apache.mina.filter.codec.ProtocolCodecFilter; - +import org.apache.mina.filter.executor.ExecutorFilter; +import org.apache.mina.transport.socket.nio.SocketSessionConfig; import org.apache.qpid.AMQConnectionClosedException; import org.apache.qpid.AMQDisconnectedException; import org.apache.qpid.AMQException; @@ -208,6 +212,36 @@ public class AMQProtocolHandler extends IoHandlerAdapter e.printStackTrace(); } + if (!System.getProperties().containsKey("protectio") || Boolean.getBoolean("protectio")) + { + try + { + //Add IO Protection Filters + IoFilterChain chain = session.getFilterChain(); + + int buf_size = 32768; + if (session.getConfig() instanceof SocketSessionConfig) + { + buf_size = ((SocketSessionConfig) session.getConfig()).getReceiveBufferSize(); + } + session.getFilterChain().addLast("tempExecutorFilterForFilterBuilder", new ExecutorFilter()); + + ReadThrottleFilterBuilder readfilter = new ReadThrottleFilterBuilder(); + readfilter.setMaximumConnectionBufferSize(buf_size); + readfilter.attach(chain); + + WriteBufferLimitFilterBuilder writefilter = new WriteBufferLimitFilterBuilder(); + writefilter.setMaximumConnectionBufferSize(buf_size * 2); + writefilter.attach(chain); + session.getFilterChain().remove("tempExecutorFilterForFilterBuilder"); + + _logger.info("Using IO Read/Write Filter Protection"); + } + catch (Exception e) + { + _logger.error("Unable to attach IO Read/Write Filter Protection :" + e.getMessage()); + } + } _protocolSession = new AMQProtocolSession(this, session, _connection, getStateManager()); _protocolSession.init(); } diff --git a/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java b/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java index 1d0d6a3491..3257caa796 100644 --- a/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java @@ -23,14 +23,13 @@ package org.apache.qpid.client.transport; import org.apache.mina.common.IoConnector; import org.apache.mina.common.IoHandlerAdapter; import org.apache.mina.common.IoServiceConfig; +import org.apache.mina.transport.socket.nio.MultiThreadSocketConnector; import org.apache.mina.transport.socket.nio.SocketConnector; import org.apache.mina.transport.vmpipe.VmPipeAcceptor; import org.apache.mina.transport.vmpipe.VmPipeAddress; - import org.apache.qpid.client.vmbroker.AMQVMBrokerCreationException; import org.apache.qpid.jms.BrokerDetails; import org.apache.qpid.pool.ReadWriteThreadModel; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -90,40 +89,44 @@ public class TransportConnection switch (transport) { - case TCP: - _instance = new SocketTransportConnection(new SocketTransportConnection.SocketConnectorFactory() + case TCP: + _instance = new SocketTransportConnection(new SocketTransportConnection.SocketConnectorFactory() + { + public IoConnector newSocketConnector() { - public IoConnector newSocketConnector() + SocketConnector result; + // FIXME - this needs to be sorted to use the new Mina MultiThread SA. + if (!System.getProperties().containsKey("qpidnio") || Boolean.getBoolean("qpidnio")) { - SocketConnector result; - // FIXME - this needs to be sorted to use the new Mina MultiThread SA. - if (Boolean.getBoolean("qpidnio")) - { - _logger.error("Using Qpid NIO - sysproperty 'qpidnio' is set."); - // result = new org.apache.qpid.nio.SocketConnector(); // non-blocking connector - } - // else - - { - _logger.info("Using Mina NIO"); - result = new SocketConnector(); // non-blocking connector - } - - // Don't have the connector's worker thread wait around for other connections (we only use - // one SocketConnector per connection at the moment anyway). This allows short-running - // clients (like unit tests) to complete quickly. - result.setWorkerTimeout(0); - - return result; + _logger.warn("Using Qpid MultiThreaded NIO - " + (System.getProperties().containsKey("qpidnio") + ? "Qpid NIO is new default" + : "Sysproperty 'qpidnio' is set")); + + + result = new MultiThreadSocketConnector(); } - }); - break; + else + { + _logger.info("Using Mina NIO"); - case VM: - { - _instance = getVMTransport(details, Boolean.getBoolean("amqj.AutoCreateVMBroker")); - break; - } + result = new SocketConnector(); // non-blocking connector + } + + // Don't have the connector's worker thread wait around for other connections (we only use + // one SocketConnector per connection at the moment anyway). This allows short-running + // clients (like unit tests) to complete quickly. + result.setWorkerTimeout(0); + + return result; + } + }); + break; + + case VM: + { + _instance = getVMTransport(details, Boolean.getBoolean("amqj.AutoCreateVMBroker")); + break; + } } return _instance; @@ -145,7 +148,7 @@ public class TransportConnection } private static ITransportConnection getVMTransport(BrokerDetails details, boolean AutoCreate) - throws AMQVMBrokerCreationException + throws AMQVMBrokerCreationException { int port = details.getPort(); @@ -160,7 +163,7 @@ public class TransportConnection else { throw new AMQVMBrokerCreationException(null, port, "VM Broker on port " + port - + " does not exist. Auto create disabled.", null); + + " does not exist. Auto create disabled.", null); } } } @@ -257,8 +260,8 @@ public class TransportConnection IoHandlerAdapter provider; try { - Class[] cnstr = { Integer.class }; - Object[] params = { port }; + Class[] cnstr = {Integer.class}; + Object[] params = {port}; provider = (IoHandlerAdapter) Class.forName(protocolProviderClass).getConstructor(cnstr).newInstance(params); // Give the broker a second to create _logger.info("Created VMBroker Instance:" + port); @@ -277,7 +280,7 @@ public class TransportConnection } AMQVMBrokerCreationException amqbce = - new AMQVMBrokerCreationException(null, port, because + " Stopped InVM Qpid.AMQP creation", null); + new AMQVMBrokerCreationException(null, port, because + " Stopped InVM Qpid.AMQP creation", null); amqbce.initCause(e); throw amqbce; } diff --git a/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java b/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java index d9137dc8b1..25a9e26285 100644 --- a/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java @@ -52,7 +52,7 @@ public class VmPipeTransportConnection implements ITransportConnection final VmPipeConnector ioConnector = new VmPipeConnector(); final IoServiceConfig cfg = ioConnector.getDefaultConfig(); - cfg.setThreadModel(ReadWriteThreadModel.getInstance()); + cfg.setThreadModel(ReadWriteThreadModel.getInstance()); final VmPipeAddress address = new VmPipeAddress(_port); _logger.info("Attempting connection to " + address); diff --git a/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java b/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java index 91f7710025..603b0834a3 100644 --- a/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java +++ b/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -31,6 +31,7 @@ public interface BrokerDetails */ public static final String OPTIONS_RETRY = "retries"; public static final String OPTIONS_CONNECT_TIMEOUT = "connecttimeout"; + public static final String OPTIONS_CONNECT_DELAY = "connectdelay"; public static final int DEFAULT_PORT = 5672; public static final String TCP = "tcp"; @@ -63,9 +64,9 @@ public interface BrokerDetails long getTimeout(); void setTimeout(long timeout); - + SSLConfiguration getSSLConfiguration(); - + void setSSLConfiguration(SSLConfiguration sslConfiguration); String toString(); diff --git a/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverRoundRobinServers.java b/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverRoundRobinServers.java index 4e0d0b79b5..405e1d3081 100644 --- a/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverRoundRobinServers.java +++ b/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverRoundRobinServers.java @@ -22,7 +22,6 @@ package org.apache.qpid.jms.failover; import org.apache.qpid.jms.BrokerDetails; import org.apache.qpid.jms.ConnectionURL; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -35,34 +34,22 @@ public class FailoverRoundRobinServers implements FailoverMethod /** The default number of times to retry each server */ public static final int DEFAULT_SERVER_RETRIES = 0; - /** - * The index into the hostDetails array of the broker to which we are connected - */ + /** The index into the hostDetails array of the broker to which we are connected */ private int _currentBrokerIndex = -1; - /** - * The number of times to retry connecting for each server - */ + /** The number of times to retry connecting for each server */ private int _serverRetries; - /** - * The current number of retry attempts made - */ + /** The current number of retry attempts made */ private int _currentServerRetry; - /** - * The number of times to cycle through the servers - */ + /** The number of times to cycle through the servers */ private int _cycleRetries; - /** - * The current number of cycles performed. - */ + /** The current number of cycles performed. */ private int _currentCycleRetries; - /** - * Array of BrokerDetail used to make connections. - */ + /** Array of BrokerDetail used to make connections. */ private ConnectionURL _connectionDetails; public FailoverRoundRobinServers(ConnectionURL connectionDetails) @@ -189,7 +176,23 @@ public class FailoverRoundRobinServers implements FailoverMethod } } - return _connectionDetails.getBrokerDetails(_currentBrokerIndex); + BrokerDetails broker = _connectionDetails.getBrokerDetails(_currentBrokerIndex); + + String delayStr = broker.getOption(BrokerDetails.OPTIONS_CONNECT_DELAY); + if (delayStr != null) + { + Long delay = Long.parseLong(delayStr); + try + { + Thread.sleep(delay); + } + catch (InterruptedException ie) + { + return null; + } + } + + return broker; } public void setBroker(BrokerDetails broker) diff --git a/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverSingleServer.java b/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverSingleServer.java index 68e6d25be0..6bd5318903 100644 --- a/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverSingleServer.java +++ b/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverSingleServer.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -96,6 +96,23 @@ public class FailoverSingleServer implements FailoverMethod return _brokerDetail; } + + + String delayStr = _brokerDetail.getOption(BrokerDetails.OPTIONS_CONNECT_DELAY); + if (delayStr != null) + { + Long delay = Long.parseLong(delayStr); + try + { + Thread.sleep(delay); + } + catch (InterruptedException ie) + { + return null; + } + } + + return _brokerDetail; } public void setBroker(BrokerDetails broker) -- cgit v1.2.1