diff options
| author | Rajith Muditha Attapattu <rajith@apache.org> | 2009-01-23 18:07:49 +0000 |
|---|---|---|
| committer | Rajith Muditha Attapattu <rajith@apache.org> | 2009-01-23 18:07:49 +0000 |
| commit | 4e9ee66a78dca84b2c6f2399969ff2f2994151fd (patch) | |
| tree | 085ecf0067e3e68770ef4796beb616da664905a5 /java/client | |
| parent | 3ebc9726ce3681abc73f7e5ecc3bbf598880db7d (diff) | |
| download | qpid-python-4e9ee66a78dca84b2c6f2399969ff2f2994151fd.tar.gz | |
This is related to QPID-1609.
Currently we only check idle state on the incomming side.
In the future we plan to add code to send a heartbeat when we reach the idle state on the outgoing side.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@737125 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client')
6 files changed, 84 insertions, 41 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index 0aaeafc442..269937d0bd 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -20,25 +20,21 @@ */ package org.apache.qpid.client; -import org.apache.qpid.AMQConnectionFailureException; -import org.apache.qpid.AMQException; -import org.apache.qpid.AMQProtocolException; -import org.apache.qpid.AMQUnresolvedAddressException; -import org.apache.qpid.client.failover.FailoverException; -import org.apache.qpid.client.failover.FailoverProtectedOperation; -import org.apache.qpid.client.protocol.AMQProtocolHandler; -import org.apache.qpid.client.configuration.ClientProperties; -import org.apache.qpid.exchange.ExchangeDefaults; -import org.apache.qpid.framing.*; -import org.apache.qpid.jms.BrokerDetails; -import org.apache.qpid.jms.Connection; -import org.apache.qpid.jms.ConnectionListener; -import org.apache.qpid.jms.ConnectionURL; -import org.apache.qpid.jms.FailoverPolicy; -import org.apache.qpid.protocol.AMQConstant; -import org.apache.qpid.url.URLSyntaxException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.lang.reflect.InvocationTargetException; +import java.net.ConnectException; +import java.net.UnknownHostException; +import java.nio.channels.UnresolvedAddressException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import javax.jms.ConnectionConsumer; import javax.jms.ConnectionMetaData; @@ -57,17 +53,33 @@ import javax.naming.NamingException; import javax.naming.Reference; import javax.naming.Referenceable; import javax.naming.StringRefAddr; -import java.io.IOException; -import java.lang.reflect.InvocationTargetException; -import java.net.ConnectException; -import java.net.UnknownHostException; -import java.nio.channels.UnresolvedAddressException; -import java.text.MessageFormat; -import java.util.*; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.qpid.AMQConnectionFailureException; +import org.apache.qpid.AMQException; +import org.apache.qpid.AMQProtocolException; +import org.apache.qpid.AMQUnresolvedAddressException; +import org.apache.qpid.client.configuration.ClientProperties; +import org.apache.qpid.client.failover.FailoverException; +import org.apache.qpid.client.failover.FailoverProtectedOperation; +import org.apache.qpid.client.protocol.AMQProtocolHandler; +import org.apache.qpid.exchange.ExchangeDefaults; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.BasicQosBody; +import org.apache.qpid.framing.BasicQosOkBody; +import org.apache.qpid.framing.ChannelOpenBody; +import org.apache.qpid.framing.ChannelOpenOkBody; +import org.apache.qpid.framing.ProtocolVersion; +import org.apache.qpid.framing.TxSelectBody; +import org.apache.qpid.framing.TxSelectOkBody; +import org.apache.qpid.jms.BrokerDetails; +import org.apache.qpid.jms.Connection; +import org.apache.qpid.jms.ConnectionListener; +import org.apache.qpid.jms.ConnectionURL; +import org.apache.qpid.jms.FailoverPolicy; +import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.url.URLSyntaxException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class AMQConnection extends Closeable implements Connection, QueueConnection, TopicConnection, Referenceable { @@ -356,7 +368,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect // use the defaul value set for all connections _syncPersistence = Boolean.getBoolean(ClientProperties.SYNC_PERSISTENT_PROP_NAME); } - + _failoverPolicy = new FailoverPolicy(connectionURL); BrokerDetails brokerDetails = _failoverPolicy.getNextBrokerDetails(); if (brokerDetails.getTransport().equals(BrokerDetails.VM)) @@ -493,7 +505,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect throw new AMQConnectionFailureException(message, connectionException); } - + _connectionMetaData = new QpidConnectionMetaData(this); } @@ -1456,4 +1468,9 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { return _syncPersistence; } + + public void setIdleTimeout(long l) + { + _delegate.setIdleTimeout(l); + } } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java index 5a4abcc9bb..cec840f5c6 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java @@ -48,5 +48,6 @@ public interface AMQConnectionDelegate void closeConnection(long timeout) throws JMSException, AMQException; <T, E extends Exception> T executeRetrySupport(FailoverProtectedOperation<T,E> operation) throws E; - + + void setIdleTimeout(long l); } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java index a2e5ac9800..77860ed60c 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java @@ -22,7 +22,6 @@ package org.apache.qpid.client; import java.io.IOException; - import java.util.ArrayList; import java.util.List; @@ -31,21 +30,19 @@ import javax.jms.JMSException; import javax.jms.XASession; import org.apache.qpid.AMQException; -import org.apache.qpid.AMQProtocolException; -import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.client.configuration.ClientProperties; import org.apache.qpid.client.failover.FailoverException; import org.apache.qpid.client.failover.FailoverProtectedOperation; import org.apache.qpid.framing.ProtocolVersion; import org.apache.qpid.jms.BrokerDetails; import org.apache.qpid.jms.Session; -import org.apache.qpid.ErrorCode; +import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.transport.Connection; import org.apache.qpid.transport.ConnectionClose; import org.apache.qpid.transport.ConnectionException; import org.apache.qpid.transport.ConnectionListener; import org.apache.qpid.transport.ProtocolVersionException; import org.apache.qpid.transport.TransportException; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -146,6 +143,17 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec " username: " + _conn.getUsername() + " password: " + _conn.getPassword()); } + + if (brokerDetail.getProperty(BrokerDetails.OPTIONS_IDLE_TIMEOUT) != null) + { + this.setIdleTimeout(Long.parseLong(brokerDetail.getProperty(BrokerDetails.OPTIONS_IDLE_TIMEOUT))); + } + else + { + // use the default value set for all connections + this.setIdleTimeout(Long.getLong(ClientProperties.IDLE_TIMEOUT_PROP_NAME,0)); + } + _qpidConnection.connect(brokerDetail.getHost(), brokerDetail.getPort(), _conn.getVirtualHost(), _conn.getUsername(), _conn.getPassword(), brokerDetail.useSSL()); _conn._connected = true; @@ -273,4 +281,8 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec } } + public void setIdleTimeout(long l) + { + _qpidConnection.setIdleTimeout(l); + } } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java index 806e4d67bc..17090875a7 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java @@ -48,7 +48,6 @@ import org.apache.qpid.framing.TxSelectBody; import org.apache.qpid.framing.TxSelectOkBody; import org.apache.qpid.jms.BrokerDetails; import org.apache.qpid.jms.ChannelLimitReachedException; -import org.apache.qpid.transport.network.io.IoTransport; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -288,5 +287,6 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate } } } - + + public void setIdleTimeout(long l){} } diff --git a/java/client/src/main/java/org/apache/qpid/client/configuration/ClientProperties.java b/java/client/src/main/java/org/apache/qpid/client/configuration/ClientProperties.java index 49ac89d9b3..986154cda8 100644 --- a/java/client/src/main/java/org/apache/qpid/client/configuration/ClientProperties.java +++ b/java/client/src/main/java/org/apache/qpid/client/configuration/ClientProperties.java @@ -46,6 +46,18 @@ public class ClientProperties * type: boolean */ public static final String SYNC_PERSISTENT_PROP_NAME = "sync_persistence"; + + + /** + * This value will be used in the following settings + * To calculate the SO_TIMEOUT option of the socket (2*idle_timeout) + * If this values is between the max and min values specified for heartbeat + * by the broker in TuneOK it will be used as the heartbeat interval. + * If not a warning will be printed and the max value specified for + * heartbeat in TuneOK will be used + */ + public static final String IDLE_TIMEOUT_PROP_NAME = "idle_timeout"; + /** * ========================================================== diff --git a/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java b/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java index 07e1be95dc..c00d983902 100644 --- a/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java +++ b/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java @@ -34,6 +34,7 @@ public interface BrokerDetails public static final String OPTIONS_RETRY = "retries"; public static final String OPTIONS_CONNECT_TIMEOUT = "connecttimeout"; public static final String OPTIONS_CONNECT_DELAY = "connectdelay"; + public static final String OPTIONS_IDLE_TIMEOUT = "idle_timeout"; public static final int DEFAULT_PORT = 5672; public static final String SOCKET = "socket"; @@ -55,7 +56,7 @@ public interface BrokerDetails public static final String VIRTUAL_HOST = "virtualhost"; public static final String CLIENT_ID = "client_id"; public static final String USERNAME = "username"; - public static final String PASSWORD = "password"; + public static final String PASSWORD = "password"; String getHost(); |
