diff options
Diffstat (limited to 'java')
16 files changed, 196 insertions, 78 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index 0aaeafc442..269937d0bd 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -20,25 +20,21 @@ */ package org.apache.qpid.client; -import org.apache.qpid.AMQConnectionFailureException; -import org.apache.qpid.AMQException; -import org.apache.qpid.AMQProtocolException; -import org.apache.qpid.AMQUnresolvedAddressException; -import org.apache.qpid.client.failover.FailoverException; -import org.apache.qpid.client.failover.FailoverProtectedOperation; -import org.apache.qpid.client.protocol.AMQProtocolHandler; -import org.apache.qpid.client.configuration.ClientProperties; -import org.apache.qpid.exchange.ExchangeDefaults; -import org.apache.qpid.framing.*; -import org.apache.qpid.jms.BrokerDetails; -import org.apache.qpid.jms.Connection; -import org.apache.qpid.jms.ConnectionListener; -import org.apache.qpid.jms.ConnectionURL; -import org.apache.qpid.jms.FailoverPolicy; -import org.apache.qpid.protocol.AMQConstant; -import org.apache.qpid.url.URLSyntaxException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.lang.reflect.InvocationTargetException; +import java.net.ConnectException; +import java.net.UnknownHostException; +import java.nio.channels.UnresolvedAddressException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import javax.jms.ConnectionConsumer; import javax.jms.ConnectionMetaData; @@ -57,17 +53,33 @@ import javax.naming.NamingException; import javax.naming.Reference; import javax.naming.Referenceable; import javax.naming.StringRefAddr; -import java.io.IOException; -import java.lang.reflect.InvocationTargetException; -import java.net.ConnectException; -import java.net.UnknownHostException; -import java.nio.channels.UnresolvedAddressException; -import java.text.MessageFormat; -import java.util.*; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.qpid.AMQConnectionFailureException; +import org.apache.qpid.AMQException; +import org.apache.qpid.AMQProtocolException; +import org.apache.qpid.AMQUnresolvedAddressException; +import org.apache.qpid.client.configuration.ClientProperties; +import org.apache.qpid.client.failover.FailoverException; +import org.apache.qpid.client.failover.FailoverProtectedOperation; +import org.apache.qpid.client.protocol.AMQProtocolHandler; +import org.apache.qpid.exchange.ExchangeDefaults; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.BasicQosBody; +import org.apache.qpid.framing.BasicQosOkBody; +import org.apache.qpid.framing.ChannelOpenBody; +import org.apache.qpid.framing.ChannelOpenOkBody; +import org.apache.qpid.framing.ProtocolVersion; +import org.apache.qpid.framing.TxSelectBody; +import org.apache.qpid.framing.TxSelectOkBody; +import org.apache.qpid.jms.BrokerDetails; +import org.apache.qpid.jms.Connection; +import org.apache.qpid.jms.ConnectionListener; +import org.apache.qpid.jms.ConnectionURL; +import org.apache.qpid.jms.FailoverPolicy; +import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.url.URLSyntaxException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class AMQConnection extends Closeable implements Connection, QueueConnection, TopicConnection, Referenceable { @@ -356,7 +368,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect // use the defaul value set for all connections _syncPersistence = Boolean.getBoolean(ClientProperties.SYNC_PERSISTENT_PROP_NAME); } - + _failoverPolicy = new FailoverPolicy(connectionURL); BrokerDetails brokerDetails = _failoverPolicy.getNextBrokerDetails(); if (brokerDetails.getTransport().equals(BrokerDetails.VM)) @@ -493,7 +505,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect throw new AMQConnectionFailureException(message, connectionException); } - + _connectionMetaData = new QpidConnectionMetaData(this); } @@ -1456,4 +1468,9 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { return _syncPersistence; } + + public void setIdleTimeout(long l) + { + _delegate.setIdleTimeout(l); + } } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java index 5a4abcc9bb..cec840f5c6 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java @@ -48,5 +48,6 @@ public interface AMQConnectionDelegate void closeConnection(long timeout) throws JMSException, AMQException; <T, E extends Exception> T executeRetrySupport(FailoverProtectedOperation<T,E> operation) throws E; - + + void setIdleTimeout(long l); } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java index a2e5ac9800..77860ed60c 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java @@ -22,7 +22,6 @@ package org.apache.qpid.client; import java.io.IOException; - import java.util.ArrayList; import java.util.List; @@ -31,21 +30,19 @@ import javax.jms.JMSException; import javax.jms.XASession; import org.apache.qpid.AMQException; -import org.apache.qpid.AMQProtocolException; -import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.client.configuration.ClientProperties; import org.apache.qpid.client.failover.FailoverException; import org.apache.qpid.client.failover.FailoverProtectedOperation; import org.apache.qpid.framing.ProtocolVersion; import org.apache.qpid.jms.BrokerDetails; import org.apache.qpid.jms.Session; -import org.apache.qpid.ErrorCode; +import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.transport.Connection; import org.apache.qpid.transport.ConnectionClose; import org.apache.qpid.transport.ConnectionException; import org.apache.qpid.transport.ConnectionListener; import org.apache.qpid.transport.ProtocolVersionException; import org.apache.qpid.transport.TransportException; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -146,6 +143,17 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec " username: " + _conn.getUsername() + " password: " + _conn.getPassword()); } + + if (brokerDetail.getProperty(BrokerDetails.OPTIONS_IDLE_TIMEOUT) != null) + { + this.setIdleTimeout(Long.parseLong(brokerDetail.getProperty(BrokerDetails.OPTIONS_IDLE_TIMEOUT))); + } + else + { + // use the default value set for all connections + this.setIdleTimeout(Long.getLong(ClientProperties.IDLE_TIMEOUT_PROP_NAME,0)); + } + _qpidConnection.connect(brokerDetail.getHost(), brokerDetail.getPort(), _conn.getVirtualHost(), _conn.getUsername(), _conn.getPassword(), brokerDetail.useSSL()); _conn._connected = true; @@ -273,4 +281,8 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec } } + public void setIdleTimeout(long l) + { + _qpidConnection.setIdleTimeout(l); + } } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java index 806e4d67bc..17090875a7 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java @@ -48,7 +48,6 @@ import org.apache.qpid.framing.TxSelectBody; import org.apache.qpid.framing.TxSelectOkBody; import org.apache.qpid.jms.BrokerDetails; import org.apache.qpid.jms.ChannelLimitReachedException; -import org.apache.qpid.transport.network.io.IoTransport; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -288,5 +287,6 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate } } } - + + public void setIdleTimeout(long l){} } diff --git a/java/client/src/main/java/org/apache/qpid/client/configuration/ClientProperties.java b/java/client/src/main/java/org/apache/qpid/client/configuration/ClientProperties.java index 49ac89d9b3..986154cda8 100644 --- a/java/client/src/main/java/org/apache/qpid/client/configuration/ClientProperties.java +++ b/java/client/src/main/java/org/apache/qpid/client/configuration/ClientProperties.java @@ -46,6 +46,18 @@ public class ClientProperties * type: boolean */ public static final String SYNC_PERSISTENT_PROP_NAME = "sync_persistence"; + + + /** + * This value will be used in the following settings + * To calculate the SO_TIMEOUT option of the socket (2*idle_timeout) + * If this values is between the max and min values specified for heartbeat + * by the broker in TuneOK it will be used as the heartbeat interval. + * If not a warning will be printed and the max value specified for + * heartbeat in TuneOK will be used + */ + public static final String IDLE_TIMEOUT_PROP_NAME = "idle_timeout"; + /** * ========================================================== diff --git a/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java b/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java index 07e1be95dc..c00d983902 100644 --- a/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java +++ b/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java @@ -34,6 +34,7 @@ public interface BrokerDetails public static final String OPTIONS_RETRY = "retries"; public static final String OPTIONS_CONNECT_TIMEOUT = "connecttimeout"; public static final String OPTIONS_CONNECT_DELAY = "connectdelay"; + public static final String OPTIONS_IDLE_TIMEOUT = "idle_timeout"; public static final int DEFAULT_PORT = 5672; public static final String SOCKET = "socket"; @@ -55,7 +56,7 @@ public interface BrokerDetails public static final String VIRTUAL_HOST = "virtualhost"; public static final String CLIENT_ID = "client_id"; public static final String USERNAME = "username"; - public static final String PASSWORD = "password"; + public static final String PASSWORD = "password"; String getHost(); diff --git a/java/common/src/main/java/org/apache/qpid/ConsoleOutput.java b/java/common/src/main/java/org/apache/qpid/ConsoleOutput.java index f17782ebf4..3c1ea22595 100644 --- a/java/common/src/main/java/org/apache/qpid/ConsoleOutput.java +++ b/java/common/src/main/java/org/apache/qpid/ConsoleOutput.java @@ -20,12 +20,12 @@ */ package org.apache.qpid; +import static org.apache.qpid.transport.util.Functions.str; + import java.nio.ByteBuffer; import org.apache.qpid.transport.Sender; -import static org.apache.qpid.transport.util.Functions.*; - /** * ConsoleOutput @@ -51,4 +51,13 @@ public class ConsoleOutput implements Sender<ByteBuffer> System.out.println("CLOSED"); } + @Override + public void setIdleTimeout(long l) + { + // TODO Auto-generated method stub + + } + + + } diff --git a/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java b/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java index 2604f6970c..276d534b14 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java +++ b/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java @@ -20,27 +20,17 @@ */ package org.apache.qpid.transport; -import java.util.ArrayList; +import static org.apache.qpid.transport.Connection.State.OPEN; + import java.util.Collections; -import java.util.HashMap; import java.util.List; -import java.util.Map; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.Lock; - -import java.io.UnsupportedEncodingException; - -import org.apache.qpid.QpidException; - -import org.apache.qpid.security.UsernamePasswordCallbackHandler; import javax.security.sasl.Sasl; import javax.security.sasl.SaslClient; import javax.security.sasl.SaslException; -import javax.security.sasl.SaslServer; - -import static org.apache.qpid.transport.Connection.State.*; +import org.apache.qpid.security.UsernamePasswordCallbackHandler; +import org.apache.qpid.transport.util.Logger; /** @@ -50,6 +40,7 @@ import static org.apache.qpid.transport.Connection.State.*; public class ClientDelegate extends ConnectionDelegate { + private static final Logger log = Logger.get(ClientDelegate.class); private String vhost; private String username; @@ -121,7 +112,14 @@ public class ClientDelegate extends ConnectionDelegate @Override public void connectionTune(Connection conn, ConnectionTune tune) { conn.setChannelMax(tune.getChannelMax()); - conn.connectionTuneOk(tune.getChannelMax(), tune.getMaxFrameSize(), tune.getHeartbeatMax()); + int hb_interval = calculateHeartbeatInterval(conn, + tune.getHeartbeatMin(), + tune.getHeartbeatMax() + ); + conn.connectionTuneOk(tune.getChannelMax(), + tune.getMaxFrameSize(), + hb_interval); + conn.setIdleTimeout(hb_interval*1000); conn.connectionOpen(vhost, null, Option.INSIST); } @@ -134,5 +132,22 @@ public class ClientDelegate extends ConnectionDelegate { throw new UnsupportedOperationException(); } - + + /** + * Currently the spec specified the min and max for heartbeat using secs + */ + private int calculateHeartbeatInterval(Connection conn,int min, int max) + { + long l = conn.getIdleTimeout()/1000; + if (l !=0 && l >= min && l <= max) + { + return (int)l; + } + else + { + log.warn("Ignoring the idle timeout %s set by the connection," + + " using the brokers max value %s", l,max); + return max; + } + } } diff --git a/java/common/src/main/java/org/apache/qpid/transport/Connection.java b/java/common/src/main/java/org/apache/qpid/transport/Connection.java index 56cbf5ee13..2f7e1490ab 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/Connection.java +++ b/java/common/src/main/java/org/apache/qpid/transport/Connection.java @@ -83,7 +83,8 @@ public class Connection extends ConnectionInvoker private String locale; private SaslServer saslServer; private SaslClient saslClient; - + private long idleTimeout = 0; + // want to make this final private int _connectionId; @@ -114,6 +115,7 @@ public class Connection extends ConnectionInvoker public void setSender(Sender<ProtocolEvent> sender) { this.sender = sender; + sender.setIdleTimeout(idleTimeout); } void setState(State state) @@ -497,6 +499,20 @@ public class Connection extends ConnectionInvoker } } + public void setIdleTimeout(long l) + { + idleTimeout = l; + if (sender != null) + { + sender.setIdleTimeout(l); + } + } + + public long getIdleTimeout() + { + return idleTimeout; + } + public String toString() { return String.format("conn:%x", System.identityHashCode(this)); diff --git a/java/common/src/main/java/org/apache/qpid/transport/Sender.java b/java/common/src/main/java/org/apache/qpid/transport/Sender.java index 9a6f675d7f..475289c83f 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/Sender.java +++ b/java/common/src/main/java/org/apache/qpid/transport/Sender.java @@ -28,6 +28,7 @@ package org.apache.qpid.transport; public interface Sender<T> { + void setIdleTimeout(long l); void send(T msg); diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java b/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java index 7908700cbe..d99ee72d14 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java +++ b/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java @@ -20,7 +20,15 @@ */ package org.apache.qpid.transport.network; -import org.apache.qpid.transport.codec.BBEncoder; +import static java.lang.Math.min; +import static org.apache.qpid.transport.network.Frame.FIRST_FRAME; +import static org.apache.qpid.transport.network.Frame.FIRST_SEG; +import static org.apache.qpid.transport.network.Frame.HEADER_SIZE; +import static org.apache.qpid.transport.network.Frame.LAST_FRAME; +import static org.apache.qpid.transport.network.Frame.LAST_SEG; + +import java.nio.ByteBuffer; +import java.nio.ByteOrder; import org.apache.qpid.transport.Header; import org.apache.qpid.transport.Method; @@ -31,13 +39,7 @@ import org.apache.qpid.transport.ProtocolHeader; import org.apache.qpid.transport.SegmentType; import org.apache.qpid.transport.Sender; import org.apache.qpid.transport.Struct; - -import java.nio.ByteBuffer; -import java.nio.ByteOrder; - -import static org.apache.qpid.transport.network.Frame.*; - -import static java.lang.Math.*; +import org.apache.qpid.transport.codec.BBEncoder; /** @@ -235,5 +237,9 @@ public final class Disassembler implements Sender<ProtocolEvent>, { throw new IllegalArgumentException("" + error); } - + + public void setIdleTimeout(long l) + { + sender.setIdleTimeout(l); + } } diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java index 60abb326f6..a8dee5aaa1 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java +++ b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java @@ -143,9 +143,6 @@ final class IoReceiver implements Runnable t.getMessage().equalsIgnoreCase("socket closed") && closed.get())) { - log.error(t, "==========================================================="); - log.error(t, "Exception"); - log.error(t, "==========================================================="); receiver.exception(t); } } diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java index 29f0c766fc..00652e2927 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java +++ b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java @@ -18,6 +18,8 @@ */ package org.apache.qpid.transport.network.io; +import static org.apache.qpid.transport.util.Functions.mod; + import java.io.IOException; import java.io.OutputStream; import java.net.Socket; @@ -30,8 +32,6 @@ import org.apache.qpid.transport.SenderException; import org.apache.qpid.transport.TransportException; import org.apache.qpid.transport.util.Logger; -import static org.apache.qpid.transport.util.Functions.*; - public final class IoSender implements Runnable, Sender<ByteBuffer> { @@ -56,6 +56,7 @@ public final class IoSender implements Runnable, Sender<ByteBuffer> private final Object notEmpty = new Object(); private final AtomicBoolean closed = new AtomicBoolean(false); private final Thread senderThread; + private long idleTimeout; private volatile Throwable exception = null; @@ -223,8 +224,7 @@ public final class IoSender implements Runnable, Sender<ByteBuffer> public void run() { - final int size = buffer.length; - + final int size = buffer.length; while (true) { final int hd = head; @@ -294,4 +294,16 @@ public final class IoSender implements Runnable, Sender<ByteBuffer> } } + public void setIdleTimeout(long l) + { + try + { + socket.setSoTimeout((int)l*2); + idleTimeout = l; + } + catch (Exception e) + { + throw new SenderException(e); + } + } } diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaSender.java b/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaSender.java index 69d4061e0c..fbedf14312 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaSender.java +++ b/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaSender.java @@ -24,7 +24,6 @@ import org.apache.mina.common.ByteBuffer; import org.apache.mina.common.CloseFuture; import org.apache.mina.common.IoSession; import org.apache.mina.common.WriteFuture; - import org.apache.qpid.transport.Sender; import org.apache.qpid.transport.TransportException; @@ -77,5 +76,15 @@ public class MinaSender implements Sender<java.nio.ByteBuffer> CloseFuture closed = session.close(); closed.join(); } - + + public void setIdleTimeout(long l) + { + //noop + } + + public long getIdleTimeout() + { + return 0; + } + } diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/nio/NioSender.java b/java/common/src/main/java/org/apache/qpid/transport/network/nio/NioSender.java index 8792fce142..5196505c2d 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/network/nio/NioSender.java +++ b/java/common/src/main/java/org/apache/qpid/transport/network/nio/NioSender.java @@ -118,4 +118,9 @@ public class NioSender implements Sender<java.nio.ByteBuffer> } } } + + public void setIdleTimeout(long l) + { + //noop + } } diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/ssl/SSLSender.java b/java/common/src/main/java/org/apache/qpid/transport/network/ssl/SSLSender.java index 9d60a2ad52..5f456f28b1 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/network/ssl/SSLSender.java +++ b/java/common/src/main/java/org/apache/qpid/transport/network/ssl/SSLSender.java @@ -232,4 +232,9 @@ public class SSLSender implements Sender<ByteBuffer> { return engineState; } + + public void setIdleTimeout(long l) + { + delegate.setIdleTimeout(l); + } } |
