summaryrefslogtreecommitdiff
path: root/java/client/src
diff options
context:
space:
mode:
authorRafael H. Schloming <rhs@apache.org>2008-07-22 18:33:00 +0000
committerRafael H. Schloming <rhs@apache.org>2008-07-22 18:33:00 +0000
commitb39a1fdca3d12691c67d51ea2a3cdc77a29f6430 (patch)
treea83e861b4ee16994c34b3b3b2589706aa7eef5d9 /java/client/src
parent1be9772b6d6ba7dbaaf74062b1b87bd49bc6f642 (diff)
downloadqpid-python-b39a1fdca3d12691c67d51ea2a3cdc77a29f6430.tar.gz
Updated the io transport to use a separate write thread with a circular buffer that does opportunistic write batching. Fixed error handling and shutdown for the io transport. Switched default from mina to the io transport for the 0-10 client. Modified InputHandler to accumulate bytes in the outer loop and simplified the state machine accordingly. These changes should address QPID-1188, prevent the Java client from running out of memory when writing messages faster than the network and/or broker can keep up, and in general improve performance.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@678848 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client/src')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQConnection.java19
-rw-r--r--java/client/src/main/java/org/apache/qpidity/nclient/Client.java23
2 files changed, 16 insertions, 26 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
index 472eaef5b5..b3b3cc1ffd 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
@@ -59,6 +59,7 @@ import javax.naming.Referenceable;
import javax.naming.StringRefAddr;
import java.io.IOException;
import java.net.ConnectException;
+import java.net.UnknownHostException;
import java.nio.channels.UnresolvedAddressException;
import java.text.MessageFormat;
import java.util.*;
@@ -467,6 +468,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
if (connectionException.getCause() != null)
{
message = connectionException.getCause().getMessage();
+ connectionException.getCause().printStackTrace();
}
else
{
@@ -486,18 +488,19 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
}
}
- AMQException e = new AMQConnectionFailureException(message, connectionException);
-
- if (connectionException != null)
+ for (Throwable th = connectionException; th != null; th = th.getCause())
{
- if (connectionException instanceof UnresolvedAddressException)
+ if (th instanceof UnresolvedAddressException ||
+ th instanceof UnknownHostException)
{
- e = new AMQUnresolvedAddressException(message, _failoverPolicy.getCurrentBrokerDetails().toString(),
- null);
+ throw new AMQUnresolvedAddressException
+ (message,
+ _failoverPolicy.getCurrentBrokerDetails().toString(),
+ connectionException);
}
-
}
- throw e;
+
+ throw new AMQConnectionFailureException(message, connectionException);
}
_connectionMetaData = new QpidConnectionMetaData(this);
diff --git a/java/client/src/main/java/org/apache/qpidity/nclient/Client.java b/java/client/src/main/java/org/apache/qpidity/nclient/Client.java
index 7de7c71d36..f8d5bbcb1c 100644
--- a/java/client/src/main/java/org/apache/qpidity/nclient/Client.java
+++ b/java/client/src/main/java/org/apache/qpidity/nclient/Client.java
@@ -43,7 +43,7 @@ import org.apache.qpidity.transport.ConnectionCloseOk;
import org.apache.qpidity.transport.TransportConstants;
import org.apache.qpidity.transport.ProtocolHeader;
import org.apache.qpidity.transport.SessionDelegate;
-import org.apache.qpidity.transport.network.io.IoHandler;
+import org.apache.qpidity.transport.network.io.IoTransport;
import org.apache.qpidity.transport.network.mina.MinaHandler;
import org.apache.qpidity.transport.network.nio.NioHandler;
import org.slf4j.Logger;
@@ -167,15 +167,16 @@ public class Client implements org.apache.qpidity.nclient.Connection
connectionDelegate.setPassword(password);
connectionDelegate.setVirtualHost(virtualHost);
- if (System.getProperty("transport","mina").equalsIgnoreCase("nio"))
+ String transport = System.getProperty("transport","io");
+ if (transport.equalsIgnoreCase("nio"))
{
_logger.info("using NIO Transport");
_conn = NioHandler.connect(host, port,connectionDelegate);
}
- else if (System.getProperty("transport","mina").equalsIgnoreCase("io"))
+ else if (transport.equalsIgnoreCase("io"))
{
_logger.info("using Plain IO Transport");
- _conn = IoHandler.connect(host, port,connectionDelegate);
+ _conn = IoTransport.connect(host, port,connectionDelegate);
}
else
{
@@ -287,20 +288,6 @@ public class Client implements org.apache.qpidity.nclient.Connection
ssn.attach(ch);
ssn.sessionAttach(ssn.getName());
ssn.sessionRequestTimeout(expiryInSeconds);
- String transport = System.getProperty("transport","mina");
-
- try
- {
- if (Boolean.getBoolean("batch") && ("io".equalsIgnoreCase(transport) || "nio".equalsIgnoreCase(transport)))
- {
- _logger.debug("using batch mode in transport " + transport);
- IoHandler.startBatchingFrames(_conn.getConnectionId());
- }
- }
- catch(Exception e)
- {
- e.printStackTrace();
- }
return ssn;
}