summaryrefslogtreecommitdiff
path: root/java/client/src/main
diff options
context:
space:
mode:
authorRajith Muditha Attapattu <rajith@apache.org>2009-01-23 18:07:49 +0000
committerRajith Muditha Attapattu <rajith@apache.org>2009-01-23 18:07:49 +0000
commit4e9ee66a78dca84b2c6f2399969ff2f2994151fd (patch)
tree085ecf0067e3e68770ef4796beb616da664905a5 /java/client/src/main
parent3ebc9726ce3681abc73f7e5ecc3bbf598880db7d (diff)
downloadqpid-python-4e9ee66a78dca84b2c6f2399969ff2f2994151fd.tar.gz
This is related to QPID-1609.
Currently we only check idle state on the incomming side. In the future we plan to add code to send a heartbeat when we reach the idle state on the outgoing side. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@737125 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client/src/main')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQConnection.java81
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java3
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java22
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java4
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/configuration/ClientProperties.java12
-rw-r--r--java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java3
6 files changed, 84 insertions, 41 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
index 0aaeafc442..269937d0bd 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
@@ -20,25 +20,21 @@
*/
package org.apache.qpid.client;
-import org.apache.qpid.AMQConnectionFailureException;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.AMQProtocolException;
-import org.apache.qpid.AMQUnresolvedAddressException;
-import org.apache.qpid.client.failover.FailoverException;
-import org.apache.qpid.client.failover.FailoverProtectedOperation;
-import org.apache.qpid.client.protocol.AMQProtocolHandler;
-import org.apache.qpid.client.configuration.ClientProperties;
-import org.apache.qpid.exchange.ExchangeDefaults;
-import org.apache.qpid.framing.*;
-import org.apache.qpid.jms.BrokerDetails;
-import org.apache.qpid.jms.Connection;
-import org.apache.qpid.jms.ConnectionListener;
-import org.apache.qpid.jms.ConnectionURL;
-import org.apache.qpid.jms.FailoverPolicy;
-import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.url.URLSyntaxException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.net.ConnectException;
+import java.net.UnknownHostException;
+import java.nio.channels.UnresolvedAddressException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.ConnectionConsumer;
import javax.jms.ConnectionMetaData;
@@ -57,17 +53,33 @@ import javax.naming.NamingException;
import javax.naming.Reference;
import javax.naming.Referenceable;
import javax.naming.StringRefAddr;
-import java.io.IOException;
-import java.lang.reflect.InvocationTargetException;
-import java.net.ConnectException;
-import java.net.UnknownHostException;
-import java.nio.channels.UnresolvedAddressException;
-import java.text.MessageFormat;
-import java.util.*;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.qpid.AMQConnectionFailureException;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQProtocolException;
+import org.apache.qpid.AMQUnresolvedAddressException;
+import org.apache.qpid.client.configuration.ClientProperties;
+import org.apache.qpid.client.failover.FailoverException;
+import org.apache.qpid.client.failover.FailoverProtectedOperation;
+import org.apache.qpid.client.protocol.AMQProtocolHandler;
+import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.BasicQosBody;
+import org.apache.qpid.framing.BasicQosOkBody;
+import org.apache.qpid.framing.ChannelOpenBody;
+import org.apache.qpid.framing.ChannelOpenOkBody;
+import org.apache.qpid.framing.ProtocolVersion;
+import org.apache.qpid.framing.TxSelectBody;
+import org.apache.qpid.framing.TxSelectOkBody;
+import org.apache.qpid.jms.BrokerDetails;
+import org.apache.qpid.jms.Connection;
+import org.apache.qpid.jms.ConnectionListener;
+import org.apache.qpid.jms.ConnectionURL;
+import org.apache.qpid.jms.FailoverPolicy;
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.url.URLSyntaxException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class AMQConnection extends Closeable implements Connection, QueueConnection, TopicConnection, Referenceable
{
@@ -356,7 +368,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
// use the defaul value set for all connections
_syncPersistence = Boolean.getBoolean(ClientProperties.SYNC_PERSISTENT_PROP_NAME);
}
-
+
_failoverPolicy = new FailoverPolicy(connectionURL);
BrokerDetails brokerDetails = _failoverPolicy.getNextBrokerDetails();
if (brokerDetails.getTransport().equals(BrokerDetails.VM))
@@ -493,7 +505,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
throw new AMQConnectionFailureException(message, connectionException);
}
-
+
_connectionMetaData = new QpidConnectionMetaData(this);
}
@@ -1456,4 +1468,9 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
{
return _syncPersistence;
}
+
+ public void setIdleTimeout(long l)
+ {
+ _delegate.setIdleTimeout(l);
+ }
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java
index 5a4abcc9bb..cec840f5c6 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java
@@ -48,5 +48,6 @@ public interface AMQConnectionDelegate
void closeConnection(long timeout) throws JMSException, AMQException;
<T, E extends Exception> T executeRetrySupport(FailoverProtectedOperation<T,E> operation) throws E;
-
+
+ void setIdleTimeout(long l);
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
index a2e5ac9800..77860ed60c 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
@@ -22,7 +22,6 @@ package org.apache.qpid.client;
import java.io.IOException;
-
import java.util.ArrayList;
import java.util.List;
@@ -31,21 +30,19 @@ import javax.jms.JMSException;
import javax.jms.XASession;
import org.apache.qpid.AMQException;
-import org.apache.qpid.AMQProtocolException;
-import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.client.configuration.ClientProperties;
import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.client.failover.FailoverProtectedOperation;
import org.apache.qpid.framing.ProtocolVersion;
import org.apache.qpid.jms.BrokerDetails;
import org.apache.qpid.jms.Session;
-import org.apache.qpid.ErrorCode;
+import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.transport.Connection;
import org.apache.qpid.transport.ConnectionClose;
import org.apache.qpid.transport.ConnectionException;
import org.apache.qpid.transport.ConnectionListener;
import org.apache.qpid.transport.ProtocolVersionException;
import org.apache.qpid.transport.TransportException;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -146,6 +143,17 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec
" username: " + _conn.getUsername() +
" password: " + _conn.getPassword());
}
+
+ if (brokerDetail.getProperty(BrokerDetails.OPTIONS_IDLE_TIMEOUT) != null)
+ {
+ this.setIdleTimeout(Long.parseLong(brokerDetail.getProperty(BrokerDetails.OPTIONS_IDLE_TIMEOUT)));
+ }
+ else
+ {
+ // use the default value set for all connections
+ this.setIdleTimeout(Long.getLong(ClientProperties.IDLE_TIMEOUT_PROP_NAME,0));
+ }
+
_qpidConnection.connect(brokerDetail.getHost(), brokerDetail.getPort(), _conn.getVirtualHost(),
_conn.getUsername(), _conn.getPassword(), brokerDetail.useSSL());
_conn._connected = true;
@@ -273,4 +281,8 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec
}
}
+ public void setIdleTimeout(long l)
+ {
+ _qpidConnection.setIdleTimeout(l);
+ }
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
index 806e4d67bc..17090875a7 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
@@ -48,7 +48,6 @@ import org.apache.qpid.framing.TxSelectBody;
import org.apache.qpid.framing.TxSelectOkBody;
import org.apache.qpid.jms.BrokerDetails;
import org.apache.qpid.jms.ChannelLimitReachedException;
-import org.apache.qpid.transport.network.io.IoTransport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -288,5 +287,6 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate
}
}
}
-
+
+ public void setIdleTimeout(long l){}
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/configuration/ClientProperties.java b/java/client/src/main/java/org/apache/qpid/client/configuration/ClientProperties.java
index 49ac89d9b3..986154cda8 100644
--- a/java/client/src/main/java/org/apache/qpid/client/configuration/ClientProperties.java
+++ b/java/client/src/main/java/org/apache/qpid/client/configuration/ClientProperties.java
@@ -46,6 +46,18 @@ public class ClientProperties
* type: boolean
*/
public static final String SYNC_PERSISTENT_PROP_NAME = "sync_persistence";
+
+
+ /**
+ * This value will be used in the following settings
+ * To calculate the SO_TIMEOUT option of the socket (2*idle_timeout)
+ * If this values is between the max and min values specified for heartbeat
+ * by the broker in TuneOK it will be used as the heartbeat interval.
+ * If not a warning will be printed and the max value specified for
+ * heartbeat in TuneOK will be used
+ */
+ public static final String IDLE_TIMEOUT_PROP_NAME = "idle_timeout";
+
/**
* ==========================================================
diff --git a/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java b/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java
index 07e1be95dc..c00d983902 100644
--- a/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java
+++ b/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java
@@ -34,6 +34,7 @@ public interface BrokerDetails
public static final String OPTIONS_RETRY = "retries";
public static final String OPTIONS_CONNECT_TIMEOUT = "connecttimeout";
public static final String OPTIONS_CONNECT_DELAY = "connectdelay";
+ public static final String OPTIONS_IDLE_TIMEOUT = "idle_timeout";
public static final int DEFAULT_PORT = 5672;
public static final String SOCKET = "socket";
@@ -55,7 +56,7 @@ public interface BrokerDetails
public static final String VIRTUAL_HOST = "virtualhost";
public static final String CLIENT_ID = "client_id";
public static final String USERNAME = "username";
- public static final String PASSWORD = "password";
+ public static final String PASSWORD = "password";
String getHost();