summaryrefslogtreecommitdiff
path: root/java/client/src
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2007-11-26 15:57:46 +0000
committerMartin Ritchie <ritchiem@apache.org>2007-11-26 15:57:46 +0000
commita62268645e691b71a645ea19ca41e93df6c7cff9 (patch)
tree88fd0d7c71d2a4e941085d96a9c3fde0a8d67fa3 /java/client/src
parentc7ae06e49f2376853c0e77afefa0a59a7c9612ea (diff)
downloadqpid-python-a62268645e691b71a645ea19ca41e93df6c7cff9.tar.gz
QPID-92, QPID-564 : Upgraded Mina to 1.0.1 still not good enough but all future versions currently have a bug with the CumulativeProtocolDecoder. It compact()s the buffer which breaks slices. Added MultiThread Support which is some of the feature set of QPID-564
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1@598324 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client/src')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java36
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java77
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java2
-rw-r--r--java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java9
-rw-r--r--java/client/src/main/java/org/apache/qpid/jms/failover/FailoverRoundRobinServers.java43
-rw-r--r--java/client/src/main/java/org/apache/qpid/jms/failover/FailoverSingleServer.java21
6 files changed, 123 insertions, 65 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
index 5ee3fa5407..8a1e78d2e0 100644
--- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
@@ -21,12 +21,16 @@
package org.apache.qpid.client.protocol;
import org.apache.mina.common.IdleStatus;
+import org.apache.mina.common.IoFilterChain;
import org.apache.mina.common.IoHandlerAdapter;
import org.apache.mina.common.IoSession;
+import org.apache.mina.filter.ReadThrottleFilterBuilder;
import org.apache.mina.filter.SSLFilter;
+import org.apache.mina.filter.WriteBufferLimitFilterBuilder;
import org.apache.mina.filter.codec.ProtocolCodecException;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
-
+import org.apache.mina.filter.executor.ExecutorFilter;
+import org.apache.mina.transport.socket.nio.SocketSessionConfig;
import org.apache.qpid.AMQConnectionClosedException;
import org.apache.qpid.AMQDisconnectedException;
import org.apache.qpid.AMQException;
@@ -208,6 +212,36 @@ public class AMQProtocolHandler extends IoHandlerAdapter
e.printStackTrace();
}
+ if (!System.getProperties().containsKey("protectio") || Boolean.getBoolean("protectio"))
+ {
+ try
+ {
+ //Add IO Protection Filters
+ IoFilterChain chain = session.getFilterChain();
+
+ int buf_size = 32768;
+ if (session.getConfig() instanceof SocketSessionConfig)
+ {
+ buf_size = ((SocketSessionConfig) session.getConfig()).getReceiveBufferSize();
+ }
+ session.getFilterChain().addLast("tempExecutorFilterForFilterBuilder", new ExecutorFilter());
+
+ ReadThrottleFilterBuilder readfilter = new ReadThrottleFilterBuilder();
+ readfilter.setMaximumConnectionBufferSize(buf_size);
+ readfilter.attach(chain);
+
+ WriteBufferLimitFilterBuilder writefilter = new WriteBufferLimitFilterBuilder();
+ writefilter.setMaximumConnectionBufferSize(buf_size * 2);
+ writefilter.attach(chain);
+ session.getFilterChain().remove("tempExecutorFilterForFilterBuilder");
+
+ _logger.info("Using IO Read/Write Filter Protection");
+ }
+ catch (Exception e)
+ {
+ _logger.error("Unable to attach IO Read/Write Filter Protection :" + e.getMessage());
+ }
+ }
_protocolSession = new AMQProtocolSession(this, session, _connection, getStateManager());
_protocolSession.init();
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java b/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
index 1d0d6a3491..3257caa796 100644
--- a/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
+++ b/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
@@ -23,14 +23,13 @@ package org.apache.qpid.client.transport;
import org.apache.mina.common.IoConnector;
import org.apache.mina.common.IoHandlerAdapter;
import org.apache.mina.common.IoServiceConfig;
+import org.apache.mina.transport.socket.nio.MultiThreadSocketConnector;
import org.apache.mina.transport.socket.nio.SocketConnector;
import org.apache.mina.transport.vmpipe.VmPipeAcceptor;
import org.apache.mina.transport.vmpipe.VmPipeAddress;
-
import org.apache.qpid.client.vmbroker.AMQVMBrokerCreationException;
import org.apache.qpid.jms.BrokerDetails;
import org.apache.qpid.pool.ReadWriteThreadModel;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -90,40 +89,44 @@ public class TransportConnection
switch (transport)
{
- case TCP:
- _instance = new SocketTransportConnection(new SocketTransportConnection.SocketConnectorFactory()
+ case TCP:
+ _instance = new SocketTransportConnection(new SocketTransportConnection.SocketConnectorFactory()
+ {
+ public IoConnector newSocketConnector()
{
- public IoConnector newSocketConnector()
+ SocketConnector result;
+ // FIXME - this needs to be sorted to use the new Mina MultiThread SA.
+ if (!System.getProperties().containsKey("qpidnio") || Boolean.getBoolean("qpidnio"))
{
- SocketConnector result;
- // FIXME - this needs to be sorted to use the new Mina MultiThread SA.
- if (Boolean.getBoolean("qpidnio"))
- {
- _logger.error("Using Qpid NIO - sysproperty 'qpidnio' is set.");
- // result = new org.apache.qpid.nio.SocketConnector(); // non-blocking connector
- }
- // else
-
- {
- _logger.info("Using Mina NIO");
- result = new SocketConnector(); // non-blocking connector
- }
-
- // Don't have the connector's worker thread wait around for other connections (we only use
- // one SocketConnector per connection at the moment anyway). This allows short-running
- // clients (like unit tests) to complete quickly.
- result.setWorkerTimeout(0);
-
- return result;
+ _logger.warn("Using Qpid MultiThreaded NIO - " + (System.getProperties().containsKey("qpidnio")
+ ? "Qpid NIO is new default"
+ : "Sysproperty 'qpidnio' is set"));
+
+
+ result = new MultiThreadSocketConnector();
}
- });
- break;
+ else
+ {
+ _logger.info("Using Mina NIO");
- case VM:
- {
- _instance = getVMTransport(details, Boolean.getBoolean("amqj.AutoCreateVMBroker"));
- break;
- }
+ result = new SocketConnector(); // non-blocking connector
+ }
+
+ // Don't have the connector's worker thread wait around for other connections (we only use
+ // one SocketConnector per connection at the moment anyway). This allows short-running
+ // clients (like unit tests) to complete quickly.
+ result.setWorkerTimeout(0);
+
+ return result;
+ }
+ });
+ break;
+
+ case VM:
+ {
+ _instance = getVMTransport(details, Boolean.getBoolean("amqj.AutoCreateVMBroker"));
+ break;
+ }
}
return _instance;
@@ -145,7 +148,7 @@ public class TransportConnection
}
private static ITransportConnection getVMTransport(BrokerDetails details, boolean AutoCreate)
- throws AMQVMBrokerCreationException
+ throws AMQVMBrokerCreationException
{
int port = details.getPort();
@@ -160,7 +163,7 @@ public class TransportConnection
else
{
throw new AMQVMBrokerCreationException(null, port, "VM Broker on port " + port
- + " does not exist. Auto create disabled.", null);
+ + " does not exist. Auto create disabled.", null);
}
}
}
@@ -257,8 +260,8 @@ public class TransportConnection
IoHandlerAdapter provider;
try
{
- Class[] cnstr = { Integer.class };
- Object[] params = { port };
+ Class[] cnstr = {Integer.class};
+ Object[] params = {port};
provider = (IoHandlerAdapter) Class.forName(protocolProviderClass).getConstructor(cnstr).newInstance(params);
// Give the broker a second to create
_logger.info("Created VMBroker Instance:" + port);
@@ -277,7 +280,7 @@ public class TransportConnection
}
AMQVMBrokerCreationException amqbce =
- new AMQVMBrokerCreationException(null, port, because + " Stopped InVM Qpid.AMQP creation", null);
+ new AMQVMBrokerCreationException(null, port, because + " Stopped InVM Qpid.AMQP creation", null);
amqbce.initCause(e);
throw amqbce;
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java b/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java
index d9137dc8b1..25a9e26285 100644
--- a/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java
+++ b/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java
@@ -52,7 +52,7 @@ public class VmPipeTransportConnection implements ITransportConnection
final VmPipeConnector ioConnector = new VmPipeConnector();
final IoServiceConfig cfg = ioConnector.getDefaultConfig();
- cfg.setThreadModel(ReadWriteThreadModel.getInstance());
+ cfg.setThreadModel(ReadWriteThreadModel.getInstance());
final VmPipeAddress address = new VmPipeAddress(_port);
_logger.info("Attempting connection to " + address);
diff --git a/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java b/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java
index 91f7710025..603b0834a3 100644
--- a/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java
+++ b/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -31,6 +31,7 @@ public interface BrokerDetails
*/
public static final String OPTIONS_RETRY = "retries";
public static final String OPTIONS_CONNECT_TIMEOUT = "connecttimeout";
+ public static final String OPTIONS_CONNECT_DELAY = "connectdelay";
public static final int DEFAULT_PORT = 5672;
public static final String TCP = "tcp";
@@ -63,9 +64,9 @@ public interface BrokerDetails
long getTimeout();
void setTimeout(long timeout);
-
+
SSLConfiguration getSSLConfiguration();
-
+
void setSSLConfiguration(SSLConfiguration sslConfiguration);
String toString();
diff --git a/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverRoundRobinServers.java b/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverRoundRobinServers.java
index 4e0d0b79b5..405e1d3081 100644
--- a/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverRoundRobinServers.java
+++ b/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverRoundRobinServers.java
@@ -22,7 +22,6 @@ package org.apache.qpid.jms.failover;
import org.apache.qpid.jms.BrokerDetails;
import org.apache.qpid.jms.ConnectionURL;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -35,34 +34,22 @@ public class FailoverRoundRobinServers implements FailoverMethod
/** The default number of times to retry each server */
public static final int DEFAULT_SERVER_RETRIES = 0;
- /**
- * The index into the hostDetails array of the broker to which we are connected
- */
+ /** The index into the hostDetails array of the broker to which we are connected */
private int _currentBrokerIndex = -1;
- /**
- * The number of times to retry connecting for each server
- */
+ /** The number of times to retry connecting for each server */
private int _serverRetries;
- /**
- * The current number of retry attempts made
- */
+ /** The current number of retry attempts made */
private int _currentServerRetry;
- /**
- * The number of times to cycle through the servers
- */
+ /** The number of times to cycle through the servers */
private int _cycleRetries;
- /**
- * The current number of cycles performed.
- */
+ /** The current number of cycles performed. */
private int _currentCycleRetries;
- /**
- * Array of BrokerDetail used to make connections.
- */
+ /** Array of BrokerDetail used to make connections. */
private ConnectionURL _connectionDetails;
public FailoverRoundRobinServers(ConnectionURL connectionDetails)
@@ -189,7 +176,23 @@ public class FailoverRoundRobinServers implements FailoverMethod
}
}
- return _connectionDetails.getBrokerDetails(_currentBrokerIndex);
+ BrokerDetails broker = _connectionDetails.getBrokerDetails(_currentBrokerIndex);
+
+ String delayStr = broker.getOption(BrokerDetails.OPTIONS_CONNECT_DELAY);
+ if (delayStr != null)
+ {
+ Long delay = Long.parseLong(delayStr);
+ try
+ {
+ Thread.sleep(delay);
+ }
+ catch (InterruptedException ie)
+ {
+ return null;
+ }
+ }
+
+ return broker;
}
public void setBroker(BrokerDetails broker)
diff --git a/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverSingleServer.java b/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverSingleServer.java
index 68e6d25be0..6bd5318903 100644
--- a/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverSingleServer.java
+++ b/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverSingleServer.java
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -96,6 +96,23 @@ public class FailoverSingleServer implements FailoverMethod
return _brokerDetail;
}
+
+
+ String delayStr = _brokerDetail.getOption(BrokerDetails.OPTIONS_CONNECT_DELAY);
+ if (delayStr != null)
+ {
+ Long delay = Long.parseLong(delayStr);
+ try
+ {
+ Thread.sleep(delay);
+ }
+ catch (InterruptedException ie)
+ {
+ return null;
+ }
+ }
+
+ return _brokerDetail;
}
public void setBroker(BrokerDetails broker)