From 4e9ee66a78dca84b2c6f2399969ff2f2994151fd Mon Sep 17 00:00:00 2001 From: Rajith Muditha Attapattu Date: Fri, 23 Jan 2009 18:07:49 +0000 Subject: This is related to QPID-1609. Currently we only check idle state on the incomming side. In the future we plan to add code to send a heartbeat when we reach the idle state on the outgoing side. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@737125 13f79535-47bb-0310-9956-ffa450edef68 --- .../main/java/org/apache/qpid/ConsoleOutput.java | 13 +++++- .../org/apache/qpid/transport/ClientDelegate.java | 47 ++++++++++++++-------- .../java/org/apache/qpid/transport/Connection.java | 18 ++++++++- .../java/org/apache/qpid/transport/Sender.java | 1 + .../qpid/transport/network/Disassembler.java | 24 ++++++----- .../qpid/transport/network/io/IoReceiver.java | 3 -- .../apache/qpid/transport/network/io/IoSender.java | 20 +++++++-- .../qpid/transport/network/mina/MinaSender.java | 13 +++++- .../qpid/transport/network/nio/NioSender.java | 5 +++ .../qpid/transport/network/ssl/SSLSender.java | 5 +++ 10 files changed, 112 insertions(+), 37 deletions(-) (limited to 'java/common/src') diff --git a/java/common/src/main/java/org/apache/qpid/ConsoleOutput.java b/java/common/src/main/java/org/apache/qpid/ConsoleOutput.java index f17782ebf4..3c1ea22595 100644 --- a/java/common/src/main/java/org/apache/qpid/ConsoleOutput.java +++ b/java/common/src/main/java/org/apache/qpid/ConsoleOutput.java @@ -20,12 +20,12 @@ */ package org.apache.qpid; +import static org.apache.qpid.transport.util.Functions.str; + import java.nio.ByteBuffer; import org.apache.qpid.transport.Sender; -import static org.apache.qpid.transport.util.Functions.*; - /** * ConsoleOutput @@ -51,4 +51,13 @@ public class ConsoleOutput implements Sender System.out.println("CLOSED"); } + @Override + public void setIdleTimeout(long l) + { + // TODO Auto-generated method stub + + } + + + } diff --git a/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java b/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java index 2604f6970c..276d534b14 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java +++ b/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java @@ -20,27 +20,17 @@ */ package org.apache.qpid.transport; -import java.util.ArrayList; +import static org.apache.qpid.transport.Connection.State.OPEN; + import java.util.Collections; -import java.util.HashMap; import java.util.List; -import java.util.Map; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.Lock; - -import java.io.UnsupportedEncodingException; - -import org.apache.qpid.QpidException; - -import org.apache.qpid.security.UsernamePasswordCallbackHandler; import javax.security.sasl.Sasl; import javax.security.sasl.SaslClient; import javax.security.sasl.SaslException; -import javax.security.sasl.SaslServer; - -import static org.apache.qpid.transport.Connection.State.*; +import org.apache.qpid.security.UsernamePasswordCallbackHandler; +import org.apache.qpid.transport.util.Logger; /** @@ -50,6 +40,7 @@ import static org.apache.qpid.transport.Connection.State.*; public class ClientDelegate extends ConnectionDelegate { + private static final Logger log = Logger.get(ClientDelegate.class); private String vhost; private String username; @@ -121,7 +112,14 @@ public class ClientDelegate extends ConnectionDelegate @Override public void connectionTune(Connection conn, ConnectionTune tune) { conn.setChannelMax(tune.getChannelMax()); - conn.connectionTuneOk(tune.getChannelMax(), tune.getMaxFrameSize(), tune.getHeartbeatMax()); + int hb_interval = calculateHeartbeatInterval(conn, + tune.getHeartbeatMin(), + tune.getHeartbeatMax() + ); + conn.connectionTuneOk(tune.getChannelMax(), + tune.getMaxFrameSize(), + hb_interval); + conn.setIdleTimeout(hb_interval*1000); conn.connectionOpen(vhost, null, Option.INSIST); } @@ -134,5 +132,22 @@ public class ClientDelegate extends ConnectionDelegate { throw new UnsupportedOperationException(); } - + + /** + * Currently the spec specified the min and max for heartbeat using secs + */ + private int calculateHeartbeatInterval(Connection conn,int min, int max) + { + long l = conn.getIdleTimeout()/1000; + if (l !=0 && l >= min && l <= max) + { + return (int)l; + } + else + { + log.warn("Ignoring the idle timeout %s set by the connection," + + " using the brokers max value %s", l,max); + return max; + } + } } diff --git a/java/common/src/main/java/org/apache/qpid/transport/Connection.java b/java/common/src/main/java/org/apache/qpid/transport/Connection.java index 56cbf5ee13..2f7e1490ab 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/Connection.java +++ b/java/common/src/main/java/org/apache/qpid/transport/Connection.java @@ -83,7 +83,8 @@ public class Connection extends ConnectionInvoker private String locale; private SaslServer saslServer; private SaslClient saslClient; - + private long idleTimeout = 0; + // want to make this final private int _connectionId; @@ -114,6 +115,7 @@ public class Connection extends ConnectionInvoker public void setSender(Sender sender) { this.sender = sender; + sender.setIdleTimeout(idleTimeout); } void setState(State state) @@ -497,6 +499,20 @@ public class Connection extends ConnectionInvoker } } + public void setIdleTimeout(long l) + { + idleTimeout = l; + if (sender != null) + { + sender.setIdleTimeout(l); + } + } + + public long getIdleTimeout() + { + return idleTimeout; + } + public String toString() { return String.format("conn:%x", System.identityHashCode(this)); diff --git a/java/common/src/main/java/org/apache/qpid/transport/Sender.java b/java/common/src/main/java/org/apache/qpid/transport/Sender.java index 9a6f675d7f..475289c83f 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/Sender.java +++ b/java/common/src/main/java/org/apache/qpid/transport/Sender.java @@ -28,6 +28,7 @@ package org.apache.qpid.transport; public interface Sender { + void setIdleTimeout(long l); void send(T msg); diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java b/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java index 7908700cbe..d99ee72d14 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java +++ b/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java @@ -20,7 +20,15 @@ */ package org.apache.qpid.transport.network; -import org.apache.qpid.transport.codec.BBEncoder; +import static java.lang.Math.min; +import static org.apache.qpid.transport.network.Frame.FIRST_FRAME; +import static org.apache.qpid.transport.network.Frame.FIRST_SEG; +import static org.apache.qpid.transport.network.Frame.HEADER_SIZE; +import static org.apache.qpid.transport.network.Frame.LAST_FRAME; +import static org.apache.qpid.transport.network.Frame.LAST_SEG; + +import java.nio.ByteBuffer; +import java.nio.ByteOrder; import org.apache.qpid.transport.Header; import org.apache.qpid.transport.Method; @@ -31,13 +39,7 @@ import org.apache.qpid.transport.ProtocolHeader; import org.apache.qpid.transport.SegmentType; import org.apache.qpid.transport.Sender; import org.apache.qpid.transport.Struct; - -import java.nio.ByteBuffer; -import java.nio.ByteOrder; - -import static org.apache.qpid.transport.network.Frame.*; - -import static java.lang.Math.*; +import org.apache.qpid.transport.codec.BBEncoder; /** @@ -235,5 +237,9 @@ public final class Disassembler implements Sender, { throw new IllegalArgumentException("" + error); } - + + public void setIdleTimeout(long l) + { + sender.setIdleTimeout(l); + } } diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java index 60abb326f6..a8dee5aaa1 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java +++ b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java @@ -143,9 +143,6 @@ final class IoReceiver implements Runnable t.getMessage().equalsIgnoreCase("socket closed") && closed.get())) { - log.error(t, "==========================================================="); - log.error(t, "Exception"); - log.error(t, "==========================================================="); receiver.exception(t); } } diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java index 29f0c766fc..00652e2927 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java +++ b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java @@ -18,6 +18,8 @@ */ package org.apache.qpid.transport.network.io; +import static org.apache.qpid.transport.util.Functions.mod; + import java.io.IOException; import java.io.OutputStream; import java.net.Socket; @@ -30,8 +32,6 @@ import org.apache.qpid.transport.SenderException; import org.apache.qpid.transport.TransportException; import org.apache.qpid.transport.util.Logger; -import static org.apache.qpid.transport.util.Functions.*; - public final class IoSender implements Runnable, Sender { @@ -56,6 +56,7 @@ public final class IoSender implements Runnable, Sender private final Object notEmpty = new Object(); private final AtomicBoolean closed = new AtomicBoolean(false); private final Thread senderThread; + private long idleTimeout; private volatile Throwable exception = null; @@ -223,8 +224,7 @@ public final class IoSender implements Runnable, Sender public void run() { - final int size = buffer.length; - + final int size = buffer.length; while (true) { final int hd = head; @@ -294,4 +294,16 @@ public final class IoSender implements Runnable, Sender } } + public void setIdleTimeout(long l) + { + try + { + socket.setSoTimeout((int)l*2); + idleTimeout = l; + } + catch (Exception e) + { + throw new SenderException(e); + } + } } diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaSender.java b/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaSender.java index 69d4061e0c..fbedf14312 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaSender.java +++ b/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaSender.java @@ -24,7 +24,6 @@ import org.apache.mina.common.ByteBuffer; import org.apache.mina.common.CloseFuture; import org.apache.mina.common.IoSession; import org.apache.mina.common.WriteFuture; - import org.apache.qpid.transport.Sender; import org.apache.qpid.transport.TransportException; @@ -77,5 +76,15 @@ public class MinaSender implements Sender CloseFuture closed = session.close(); closed.join(); } - + + public void setIdleTimeout(long l) + { + //noop + } + + public long getIdleTimeout() + { + return 0; + } + } diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/nio/NioSender.java b/java/common/src/main/java/org/apache/qpid/transport/network/nio/NioSender.java index 8792fce142..5196505c2d 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/network/nio/NioSender.java +++ b/java/common/src/main/java/org/apache/qpid/transport/network/nio/NioSender.java @@ -118,4 +118,9 @@ public class NioSender implements Sender } } } + + public void setIdleTimeout(long l) + { + //noop + } } diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/ssl/SSLSender.java b/java/common/src/main/java/org/apache/qpid/transport/network/ssl/SSLSender.java index 9d60a2ad52..5f456f28b1 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/network/ssl/SSLSender.java +++ b/java/common/src/main/java/org/apache/qpid/transport/network/ssl/SSLSender.java @@ -232,4 +232,9 @@ public class SSLSender implements Sender { return engineState; } + + public void setIdleTimeout(long l) + { + delegate.setIdleTimeout(l); + } } -- cgit v1.2.1