summaryrefslogtreecommitdiff
path: root/java/common/src
diff options
context:
space:
mode:
authorRajith Muditha Attapattu <rajith@apache.org>2009-01-23 18:07:49 +0000
committerRajith Muditha Attapattu <rajith@apache.org>2009-01-23 18:07:49 +0000
commit4e9ee66a78dca84b2c6f2399969ff2f2994151fd (patch)
tree085ecf0067e3e68770ef4796beb616da664905a5 /java/common/src
parent3ebc9726ce3681abc73f7e5ecc3bbf598880db7d (diff)
downloadqpid-python-4e9ee66a78dca84b2c6f2399969ff2f2994151fd.tar.gz
This is related to QPID-1609.
Currently we only check idle state on the incomming side. In the future we plan to add code to send a heartbeat when we reach the idle state on the outgoing side. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@737125 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/common/src')
-rw-r--r--java/common/src/main/java/org/apache/qpid/ConsoleOutput.java13
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java47
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/Connection.java18
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/Sender.java1
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java24
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java3
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java20
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaSender.java13
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/network/nio/NioSender.java5
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/network/ssl/SSLSender.java5
10 files changed, 112 insertions, 37 deletions
diff --git a/java/common/src/main/java/org/apache/qpid/ConsoleOutput.java b/java/common/src/main/java/org/apache/qpid/ConsoleOutput.java
index f17782ebf4..3c1ea22595 100644
--- a/java/common/src/main/java/org/apache/qpid/ConsoleOutput.java
+++ b/java/common/src/main/java/org/apache/qpid/ConsoleOutput.java
@@ -20,12 +20,12 @@
*/
package org.apache.qpid;
+import static org.apache.qpid.transport.util.Functions.str;
+
import java.nio.ByteBuffer;
import org.apache.qpid.transport.Sender;
-import static org.apache.qpid.transport.util.Functions.*;
-
/**
* ConsoleOutput
@@ -51,4 +51,13 @@ public class ConsoleOutput implements Sender<ByteBuffer>
System.out.println("CLOSED");
}
+ @Override
+ public void setIdleTimeout(long l)
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+
+
}
diff --git a/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java b/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java
index 2604f6970c..276d534b14 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java
@@ -20,27 +20,17 @@
*/
package org.apache.qpid.transport;
-import java.util.ArrayList;
+import static org.apache.qpid.transport.Connection.State.OPEN;
+
import java.util.Collections;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.Lock;
-
-import java.io.UnsupportedEncodingException;
-
-import org.apache.qpid.QpidException;
-
-import org.apache.qpid.security.UsernamePasswordCallbackHandler;
import javax.security.sasl.Sasl;
import javax.security.sasl.SaslClient;
import javax.security.sasl.SaslException;
-import javax.security.sasl.SaslServer;
-
-import static org.apache.qpid.transport.Connection.State.*;
+import org.apache.qpid.security.UsernamePasswordCallbackHandler;
+import org.apache.qpid.transport.util.Logger;
/**
@@ -50,6 +40,7 @@ import static org.apache.qpid.transport.Connection.State.*;
public class ClientDelegate extends ConnectionDelegate
{
+ private static final Logger log = Logger.get(ClientDelegate.class);
private String vhost;
private String username;
@@ -121,7 +112,14 @@ public class ClientDelegate extends ConnectionDelegate
@Override public void connectionTune(Connection conn, ConnectionTune tune)
{
conn.setChannelMax(tune.getChannelMax());
- conn.connectionTuneOk(tune.getChannelMax(), tune.getMaxFrameSize(), tune.getHeartbeatMax());
+ int hb_interval = calculateHeartbeatInterval(conn,
+ tune.getHeartbeatMin(),
+ tune.getHeartbeatMax()
+ );
+ conn.connectionTuneOk(tune.getChannelMax(),
+ tune.getMaxFrameSize(),
+ hb_interval);
+ conn.setIdleTimeout(hb_interval*1000);
conn.connectionOpen(vhost, null, Option.INSIST);
}
@@ -134,5 +132,22 @@ public class ClientDelegate extends ConnectionDelegate
{
throw new UnsupportedOperationException();
}
-
+
+ /**
+ * Currently the spec specified the min and max for heartbeat using secs
+ */
+ private int calculateHeartbeatInterval(Connection conn,int min, int max)
+ {
+ long l = conn.getIdleTimeout()/1000;
+ if (l !=0 && l >= min && l <= max)
+ {
+ return (int)l;
+ }
+ else
+ {
+ log.warn("Ignoring the idle timeout %s set by the connection," +
+ " using the brokers max value %s", l,max);
+ return max;
+ }
+ }
}
diff --git a/java/common/src/main/java/org/apache/qpid/transport/Connection.java b/java/common/src/main/java/org/apache/qpid/transport/Connection.java
index 56cbf5ee13..2f7e1490ab 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/Connection.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/Connection.java
@@ -83,7 +83,8 @@ public class Connection extends ConnectionInvoker
private String locale;
private SaslServer saslServer;
private SaslClient saslClient;
-
+ private long idleTimeout = 0;
+
// want to make this final
private int _connectionId;
@@ -114,6 +115,7 @@ public class Connection extends ConnectionInvoker
public void setSender(Sender<ProtocolEvent> sender)
{
this.sender = sender;
+ sender.setIdleTimeout(idleTimeout);
}
void setState(State state)
@@ -497,6 +499,20 @@ public class Connection extends ConnectionInvoker
}
}
+ public void setIdleTimeout(long l)
+ {
+ idleTimeout = l;
+ if (sender != null)
+ {
+ sender.setIdleTimeout(l);
+ }
+ }
+
+ public long getIdleTimeout()
+ {
+ return idleTimeout;
+ }
+
public String toString()
{
return String.format("conn:%x", System.identityHashCode(this));
diff --git a/java/common/src/main/java/org/apache/qpid/transport/Sender.java b/java/common/src/main/java/org/apache/qpid/transport/Sender.java
index 9a6f675d7f..475289c83f 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/Sender.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/Sender.java
@@ -28,6 +28,7 @@ package org.apache.qpid.transport;
public interface Sender<T>
{
+ void setIdleTimeout(long l);
void send(T msg);
diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java b/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java
index 7908700cbe..d99ee72d14 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java
@@ -20,7 +20,15 @@
*/
package org.apache.qpid.transport.network;
-import org.apache.qpid.transport.codec.BBEncoder;
+import static java.lang.Math.min;
+import static org.apache.qpid.transport.network.Frame.FIRST_FRAME;
+import static org.apache.qpid.transport.network.Frame.FIRST_SEG;
+import static org.apache.qpid.transport.network.Frame.HEADER_SIZE;
+import static org.apache.qpid.transport.network.Frame.LAST_FRAME;
+import static org.apache.qpid.transport.network.Frame.LAST_SEG;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
import org.apache.qpid.transport.Header;
import org.apache.qpid.transport.Method;
@@ -31,13 +39,7 @@ import org.apache.qpid.transport.ProtocolHeader;
import org.apache.qpid.transport.SegmentType;
import org.apache.qpid.transport.Sender;
import org.apache.qpid.transport.Struct;
-
-import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
-
-import static org.apache.qpid.transport.network.Frame.*;
-
-import static java.lang.Math.*;
+import org.apache.qpid.transport.codec.BBEncoder;
/**
@@ -235,5 +237,9 @@ public final class Disassembler implements Sender<ProtocolEvent>,
{
throw new IllegalArgumentException("" + error);
}
-
+
+ public void setIdleTimeout(long l)
+ {
+ sender.setIdleTimeout(l);
+ }
}
diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java
index 60abb326f6..a8dee5aaa1 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java
@@ -143,9 +143,6 @@ final class IoReceiver implements Runnable
t.getMessage().equalsIgnoreCase("socket closed") &&
closed.get()))
{
- log.error(t, "===========================================================");
- log.error(t, "Exception");
- log.error(t, "===========================================================");
receiver.exception(t);
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java
index 29f0c766fc..00652e2927 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java
@@ -18,6 +18,8 @@
*/
package org.apache.qpid.transport.network.io;
+import static org.apache.qpid.transport.util.Functions.mod;
+
import java.io.IOException;
import java.io.OutputStream;
import java.net.Socket;
@@ -30,8 +32,6 @@ import org.apache.qpid.transport.SenderException;
import org.apache.qpid.transport.TransportException;
import org.apache.qpid.transport.util.Logger;
-import static org.apache.qpid.transport.util.Functions.*;
-
public final class IoSender implements Runnable, Sender<ByteBuffer>
{
@@ -56,6 +56,7 @@ public final class IoSender implements Runnable, Sender<ByteBuffer>
private final Object notEmpty = new Object();
private final AtomicBoolean closed = new AtomicBoolean(false);
private final Thread senderThread;
+ private long idleTimeout;
private volatile Throwable exception = null;
@@ -223,8 +224,7 @@ public final class IoSender implements Runnable, Sender<ByteBuffer>
public void run()
{
- final int size = buffer.length;
-
+ final int size = buffer.length;
while (true)
{
final int hd = head;
@@ -294,4 +294,16 @@ public final class IoSender implements Runnable, Sender<ByteBuffer>
}
}
+ public void setIdleTimeout(long l)
+ {
+ try
+ {
+ socket.setSoTimeout((int)l*2);
+ idleTimeout = l;
+ }
+ catch (Exception e)
+ {
+ throw new SenderException(e);
+ }
+ }
}
diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaSender.java b/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaSender.java
index 69d4061e0c..fbedf14312 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaSender.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaSender.java
@@ -24,7 +24,6 @@ import org.apache.mina.common.ByteBuffer;
import org.apache.mina.common.CloseFuture;
import org.apache.mina.common.IoSession;
import org.apache.mina.common.WriteFuture;
-
import org.apache.qpid.transport.Sender;
import org.apache.qpid.transport.TransportException;
@@ -77,5 +76,15 @@ public class MinaSender implements Sender<java.nio.ByteBuffer>
CloseFuture closed = session.close();
closed.join();
}
-
+
+ public void setIdleTimeout(long l)
+ {
+ //noop
+ }
+
+ public long getIdleTimeout()
+ {
+ return 0;
+ }
+
}
diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/nio/NioSender.java b/java/common/src/main/java/org/apache/qpid/transport/network/nio/NioSender.java
index 8792fce142..5196505c2d 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/network/nio/NioSender.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/network/nio/NioSender.java
@@ -118,4 +118,9 @@ public class NioSender implements Sender<java.nio.ByteBuffer>
}
}
}
+
+ public void setIdleTimeout(long l)
+ {
+ //noop
+ }
}
diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/ssl/SSLSender.java b/java/common/src/main/java/org/apache/qpid/transport/network/ssl/SSLSender.java
index 9d60a2ad52..5f456f28b1 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/network/ssl/SSLSender.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/network/ssl/SSLSender.java
@@ -232,4 +232,9 @@ public class SSLSender implements Sender<ByteBuffer>
{
return engineState;
}
+
+ public void setIdleTimeout(long l)
+ {
+ delegate.setIdleTimeout(l);
+ }
}