diff options
| author | Rajith Muditha Attapattu <rajith@apache.org> | 2009-01-23 18:07:49 +0000 |
|---|---|---|
| committer | Rajith Muditha Attapattu <rajith@apache.org> | 2009-01-23 18:07:49 +0000 |
| commit | 4e9ee66a78dca84b2c6f2399969ff2f2994151fd (patch) | |
| tree | 085ecf0067e3e68770ef4796beb616da664905a5 /java/common/src | |
| parent | 3ebc9726ce3681abc73f7e5ecc3bbf598880db7d (diff) | |
| download | qpid-python-4e9ee66a78dca84b2c6f2399969ff2f2994151fd.tar.gz | |
This is related to QPID-1609.
Currently we only check idle state on the incomming side.
In the future we plan to add code to send a heartbeat when we reach the idle state on the outgoing side.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@737125 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/common/src')
10 files changed, 112 insertions, 37 deletions
diff --git a/java/common/src/main/java/org/apache/qpid/ConsoleOutput.java b/java/common/src/main/java/org/apache/qpid/ConsoleOutput.java index f17782ebf4..3c1ea22595 100644 --- a/java/common/src/main/java/org/apache/qpid/ConsoleOutput.java +++ b/java/common/src/main/java/org/apache/qpid/ConsoleOutput.java @@ -20,12 +20,12 @@ */ package org.apache.qpid; +import static org.apache.qpid.transport.util.Functions.str; + import java.nio.ByteBuffer; import org.apache.qpid.transport.Sender; -import static org.apache.qpid.transport.util.Functions.*; - /** * ConsoleOutput @@ -51,4 +51,13 @@ public class ConsoleOutput implements Sender<ByteBuffer> System.out.println("CLOSED"); } + @Override + public void setIdleTimeout(long l) + { + // TODO Auto-generated method stub + + } + + + } diff --git a/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java b/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java index 2604f6970c..276d534b14 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java +++ b/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java @@ -20,27 +20,17 @@ */ package org.apache.qpid.transport; -import java.util.ArrayList; +import static org.apache.qpid.transport.Connection.State.OPEN; + import java.util.Collections; -import java.util.HashMap; import java.util.List; -import java.util.Map; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.Lock; - -import java.io.UnsupportedEncodingException; - -import org.apache.qpid.QpidException; - -import org.apache.qpid.security.UsernamePasswordCallbackHandler; import javax.security.sasl.Sasl; import javax.security.sasl.SaslClient; import javax.security.sasl.SaslException; -import javax.security.sasl.SaslServer; - -import static org.apache.qpid.transport.Connection.State.*; +import org.apache.qpid.security.UsernamePasswordCallbackHandler; +import org.apache.qpid.transport.util.Logger; /** @@ -50,6 +40,7 @@ import static org.apache.qpid.transport.Connection.State.*; public class ClientDelegate extends ConnectionDelegate { + private static final Logger log = Logger.get(ClientDelegate.class); private String vhost; private String username; @@ -121,7 +112,14 @@ public class ClientDelegate extends ConnectionDelegate @Override public void connectionTune(Connection conn, ConnectionTune tune) { conn.setChannelMax(tune.getChannelMax()); - conn.connectionTuneOk(tune.getChannelMax(), tune.getMaxFrameSize(), tune.getHeartbeatMax()); + int hb_interval = calculateHeartbeatInterval(conn, + tune.getHeartbeatMin(), + tune.getHeartbeatMax() + ); + conn.connectionTuneOk(tune.getChannelMax(), + tune.getMaxFrameSize(), + hb_interval); + conn.setIdleTimeout(hb_interval*1000); conn.connectionOpen(vhost, null, Option.INSIST); } @@ -134,5 +132,22 @@ public class ClientDelegate extends ConnectionDelegate { throw new UnsupportedOperationException(); } - + + /** + * Currently the spec specified the min and max for heartbeat using secs + */ + private int calculateHeartbeatInterval(Connection conn,int min, int max) + { + long l = conn.getIdleTimeout()/1000; + if (l !=0 && l >= min && l <= max) + { + return (int)l; + } + else + { + log.warn("Ignoring the idle timeout %s set by the connection," + + " using the brokers max value %s", l,max); + return max; + } + } } diff --git a/java/common/src/main/java/org/apache/qpid/transport/Connection.java b/java/common/src/main/java/org/apache/qpid/transport/Connection.java index 56cbf5ee13..2f7e1490ab 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/Connection.java +++ b/java/common/src/main/java/org/apache/qpid/transport/Connection.java @@ -83,7 +83,8 @@ public class Connection extends ConnectionInvoker private String locale; private SaslServer saslServer; private SaslClient saslClient; - + private long idleTimeout = 0; + // want to make this final private int _connectionId; @@ -114,6 +115,7 @@ public class Connection extends ConnectionInvoker public void setSender(Sender<ProtocolEvent> sender) { this.sender = sender; + sender.setIdleTimeout(idleTimeout); } void setState(State state) @@ -497,6 +499,20 @@ public class Connection extends ConnectionInvoker } } + public void setIdleTimeout(long l) + { + idleTimeout = l; + if (sender != null) + { + sender.setIdleTimeout(l); + } + } + + public long getIdleTimeout() + { + return idleTimeout; + } + public String toString() { return String.format("conn:%x", System.identityHashCode(this)); diff --git a/java/common/src/main/java/org/apache/qpid/transport/Sender.java b/java/common/src/main/java/org/apache/qpid/transport/Sender.java index 9a6f675d7f..475289c83f 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/Sender.java +++ b/java/common/src/main/java/org/apache/qpid/transport/Sender.java @@ -28,6 +28,7 @@ package org.apache.qpid.transport; public interface Sender<T> { + void setIdleTimeout(long l); void send(T msg); diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java b/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java index 7908700cbe..d99ee72d14 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java +++ b/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java @@ -20,7 +20,15 @@ */ package org.apache.qpid.transport.network; -import org.apache.qpid.transport.codec.BBEncoder; +import static java.lang.Math.min; +import static org.apache.qpid.transport.network.Frame.FIRST_FRAME; +import static org.apache.qpid.transport.network.Frame.FIRST_SEG; +import static org.apache.qpid.transport.network.Frame.HEADER_SIZE; +import static org.apache.qpid.transport.network.Frame.LAST_FRAME; +import static org.apache.qpid.transport.network.Frame.LAST_SEG; + +import java.nio.ByteBuffer; +import java.nio.ByteOrder; import org.apache.qpid.transport.Header; import org.apache.qpid.transport.Method; @@ -31,13 +39,7 @@ import org.apache.qpid.transport.ProtocolHeader; import org.apache.qpid.transport.SegmentType; import org.apache.qpid.transport.Sender; import org.apache.qpid.transport.Struct; - -import java.nio.ByteBuffer; -import java.nio.ByteOrder; - -import static org.apache.qpid.transport.network.Frame.*; - -import static java.lang.Math.*; +import org.apache.qpid.transport.codec.BBEncoder; /** @@ -235,5 +237,9 @@ public final class Disassembler implements Sender<ProtocolEvent>, { throw new IllegalArgumentException("" + error); } - + + public void setIdleTimeout(long l) + { + sender.setIdleTimeout(l); + } } diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java index 60abb326f6..a8dee5aaa1 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java +++ b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java @@ -143,9 +143,6 @@ final class IoReceiver implements Runnable t.getMessage().equalsIgnoreCase("socket closed") && closed.get())) { - log.error(t, "==========================================================="); - log.error(t, "Exception"); - log.error(t, "==========================================================="); receiver.exception(t); } } diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java index 29f0c766fc..00652e2927 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java +++ b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java @@ -18,6 +18,8 @@ */ package org.apache.qpid.transport.network.io; +import static org.apache.qpid.transport.util.Functions.mod; + import java.io.IOException; import java.io.OutputStream; import java.net.Socket; @@ -30,8 +32,6 @@ import org.apache.qpid.transport.SenderException; import org.apache.qpid.transport.TransportException; import org.apache.qpid.transport.util.Logger; -import static org.apache.qpid.transport.util.Functions.*; - public final class IoSender implements Runnable, Sender<ByteBuffer> { @@ -56,6 +56,7 @@ public final class IoSender implements Runnable, Sender<ByteBuffer> private final Object notEmpty = new Object(); private final AtomicBoolean closed = new AtomicBoolean(false); private final Thread senderThread; + private long idleTimeout; private volatile Throwable exception = null; @@ -223,8 +224,7 @@ public final class IoSender implements Runnable, Sender<ByteBuffer> public void run() { - final int size = buffer.length; - + final int size = buffer.length; while (true) { final int hd = head; @@ -294,4 +294,16 @@ public final class IoSender implements Runnable, Sender<ByteBuffer> } } + public void setIdleTimeout(long l) + { + try + { + socket.setSoTimeout((int)l*2); + idleTimeout = l; + } + catch (Exception e) + { + throw new SenderException(e); + } + } } diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaSender.java b/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaSender.java index 69d4061e0c..fbedf14312 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaSender.java +++ b/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaSender.java @@ -24,7 +24,6 @@ import org.apache.mina.common.ByteBuffer; import org.apache.mina.common.CloseFuture; import org.apache.mina.common.IoSession; import org.apache.mina.common.WriteFuture; - import org.apache.qpid.transport.Sender; import org.apache.qpid.transport.TransportException; @@ -77,5 +76,15 @@ public class MinaSender implements Sender<java.nio.ByteBuffer> CloseFuture closed = session.close(); closed.join(); } - + + public void setIdleTimeout(long l) + { + //noop + } + + public long getIdleTimeout() + { + return 0; + } + } diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/nio/NioSender.java b/java/common/src/main/java/org/apache/qpid/transport/network/nio/NioSender.java index 8792fce142..5196505c2d 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/network/nio/NioSender.java +++ b/java/common/src/main/java/org/apache/qpid/transport/network/nio/NioSender.java @@ -118,4 +118,9 @@ public class NioSender implements Sender<java.nio.ByteBuffer> } } } + + public void setIdleTimeout(long l) + { + //noop + } } diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/ssl/SSLSender.java b/java/common/src/main/java/org/apache/qpid/transport/network/ssl/SSLSender.java index 9d60a2ad52..5f456f28b1 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/network/ssl/SSLSender.java +++ b/java/common/src/main/java/org/apache/qpid/transport/network/ssl/SSLSender.java @@ -232,4 +232,9 @@ public class SSLSender implements Sender<ByteBuffer> { return engineState; } + + public void setIdleTimeout(long l) + { + delegate.setIdleTimeout(l); + } } |
