From b39a1fdca3d12691c67d51ea2a3cdc77a29f6430 Mon Sep 17 00:00:00 2001 From: "Rafael H. Schloming" Date: Tue, 22 Jul 2008 18:33:00 +0000 Subject: Updated the io transport to use a separate write thread with a circular buffer that does opportunistic write batching. Fixed error handling and shutdown for the io transport. Switched default from mina to the io transport for the 0-10 client. Modified InputHandler to accumulate bytes in the outer loop and simplified the state machine accordingly. These changes should address QPID-1188, prevent the Java client from running out of memory when writing messages faster than the network and/or broker can keep up, and in general improve performance. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@678848 13f79535-47bb-0310-9956-ffa450edef68 --- .../java/org/apache/qpid/client/AMQConnection.java | 19 ++++++++++-------- .../java/org/apache/qpidity/nclient/Client.java | 23 +++++----------------- 2 files changed, 16 insertions(+), 26 deletions(-) (limited to 'java/client/src') diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index 472eaef5b5..b3b3cc1ffd 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -59,6 +59,7 @@ import javax.naming.Referenceable; import javax.naming.StringRefAddr; import java.io.IOException; import java.net.ConnectException; +import java.net.UnknownHostException; import java.nio.channels.UnresolvedAddressException; import java.text.MessageFormat; import java.util.*; @@ -467,6 +468,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect if (connectionException.getCause() != null) { message = connectionException.getCause().getMessage(); + connectionException.getCause().printStackTrace(); } else { @@ -486,18 +488,19 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect } } - AMQException e = new AMQConnectionFailureException(message, connectionException); - - if (connectionException != null) + for (Throwable th = connectionException; th != null; th = th.getCause()) { - if (connectionException instanceof UnresolvedAddressException) + if (th instanceof UnresolvedAddressException || + th instanceof UnknownHostException) { - e = new AMQUnresolvedAddressException(message, _failoverPolicy.getCurrentBrokerDetails().toString(), - null); + throw new AMQUnresolvedAddressException + (message, + _failoverPolicy.getCurrentBrokerDetails().toString(), + connectionException); } - } - throw e; + + throw new AMQConnectionFailureException(message, connectionException); } _connectionMetaData = new QpidConnectionMetaData(this); diff --git a/java/client/src/main/java/org/apache/qpidity/nclient/Client.java b/java/client/src/main/java/org/apache/qpidity/nclient/Client.java index 7de7c71d36..f8d5bbcb1c 100644 --- a/java/client/src/main/java/org/apache/qpidity/nclient/Client.java +++ b/java/client/src/main/java/org/apache/qpidity/nclient/Client.java @@ -43,7 +43,7 @@ import org.apache.qpidity.transport.ConnectionCloseOk; import org.apache.qpidity.transport.TransportConstants; import org.apache.qpidity.transport.ProtocolHeader; import org.apache.qpidity.transport.SessionDelegate; -import org.apache.qpidity.transport.network.io.IoHandler; +import org.apache.qpidity.transport.network.io.IoTransport; import org.apache.qpidity.transport.network.mina.MinaHandler; import org.apache.qpidity.transport.network.nio.NioHandler; import org.slf4j.Logger; @@ -167,15 +167,16 @@ public class Client implements org.apache.qpidity.nclient.Connection connectionDelegate.setPassword(password); connectionDelegate.setVirtualHost(virtualHost); - if (System.getProperty("transport","mina").equalsIgnoreCase("nio")) + String transport = System.getProperty("transport","io"); + if (transport.equalsIgnoreCase("nio")) { _logger.info("using NIO Transport"); _conn = NioHandler.connect(host, port,connectionDelegate); } - else if (System.getProperty("transport","mina").equalsIgnoreCase("io")) + else if (transport.equalsIgnoreCase("io")) { _logger.info("using Plain IO Transport"); - _conn = IoHandler.connect(host, port,connectionDelegate); + _conn = IoTransport.connect(host, port,connectionDelegate); } else { @@ -287,20 +288,6 @@ public class Client implements org.apache.qpidity.nclient.Connection ssn.attach(ch); ssn.sessionAttach(ssn.getName()); ssn.sessionRequestTimeout(expiryInSeconds); - String transport = System.getProperty("transport","mina"); - - try - { - if (Boolean.getBoolean("batch") && ("io".equalsIgnoreCase(transport) || "nio".equalsIgnoreCase(transport))) - { - _logger.debug("using batch mode in transport " + transport); - IoHandler.startBatchingFrames(_conn.getConnectionId()); - } - } - catch(Exception e) - { - e.printStackTrace(); - } return ssn; } -- cgit v1.2.1