From 3782211ac3babfdf4bad5d8405875f4f8d75d3fb Mon Sep 17 00:00:00 2001 From: Robert Godfrey Date: Sun, 25 Nov 2012 17:15:43 +0000 Subject: QPID-2796 : Implement hearbeating in Java Broker (all protocol versions) and Java Client (0-8/9/9-1 path) git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1413376 13f79535-47bb-0310-9956-ffa450edef68 --- .../qpid/server/protocol/AMQProtocolEngine.java | 24 +- .../protocol/MultiVersionProtocolEngine.java | 36 +++ .../qpid/server/protocol/ProtocolEngine_0_10.java | 55 ++++- .../qpid/server/protocol/ProtocolEngine_1_0_0.java | 13 ++ .../server/protocol/ProtocolEngine_1_0_0_SASL.java | 15 +- .../qpid/server/transport/ServerConnection.java | 18 ++ .../server/transport/ServerConnectionDelegate.java | 20 +- .../qpid/client/AMQConnectionDelegate_0_10.java | 6 +- .../qpid/client/AMQConnectionDelegate_8_0.java | 4 +- .../qpid/client/protocol/AMQProtocolHandler.java | 16 ++ .../org/apache/qpid/protocol/ProtocolEngine.java | 5 +- .../java/org/apache/qpid/transport/Connection.java | 49 +++- .../network/IncomingNetworkTransport.java | 4 +- .../qpid/transport/network/NetworkConnection.java | 4 + .../network/OutgoingNetworkTransport.java | 4 +- .../org/apache/qpid/transport/network/Ticker.java | 29 +++ .../qpid/transport/network/TransportActivity.java | 33 +++ .../transport/network/io/IdleTimeoutTicker.java | 87 +++++++ .../transport/network/io/IoNetworkConnection.java | 31 ++- .../transport/network/io/IoNetworkTransport.java | 26 ++- .../qpid/transport/network/io/IoReceiver.java | 76 +++++- .../qpid/transport/TestNetworkConnection.java | 12 + .../qpid/transport/network/TransportTest.java | 6 +- .../network/io/IdleTimeoutTickerTest.java | 257 +++++++++++++++++++++ 24 files changed, 781 insertions(+), 49 deletions(-) create mode 100644 java/common/src/main/java/org/apache/qpid/transport/network/Ticker.java create mode 100644 java/common/src/main/java/org/apache/qpid/transport/network/TransportActivity.java create mode 100644 java/common/src/main/java/org/apache/qpid/transport/network/io/IdleTimeoutTicker.java create mode 100644 java/common/src/test/java/org/apache/qpid/transport/network/io/IdleTimeoutTickerTest.java (limited to 'java') diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java index 976d7fd28a..72c21d357e 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java @@ -33,6 +33,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -150,6 +151,8 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi private boolean _blocking; private final Lock _receivedLock; + private AtomicLong _lastWriteTime = new AtomicLong(System.currentTimeMillis()); + public AMQProtocolEngine(VirtualHostRegistry virtualHostRegistry, NetworkConnection network, final long connectionId) { @@ -541,7 +544,10 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi final ByteBuffer buf = asByteBuffer(frame); _writtenBytes += buf.remaining(); _sender.send(buf); - _lastIoTime = System.currentTimeMillis(); + final long time = System.currentTimeMillis(); + _lastIoTime = time; + _lastWriteTime.set(time); + if(!_deferFlush) { _sender.flush(); @@ -1058,12 +1064,12 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi public void readerIdle() { - // Nothing + // TODO - enforce disconnect on lack of inbound data } public synchronized void writerIdle() { - _sender.send(asByteBuffer(HeartbeatBody.FRAME)); + writeFrame(HeartbeatBody.FRAME); } public void exception(Throwable throwable) @@ -1461,4 +1467,16 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi { return _receivedLock; } + + @Override + public long getLastReadTime() + { + return _lastReceivedTime; + } + + @Override + public long getLastWriteTime() + { + return _lastWriteTime.get(); + } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java index cb7680e9b6..707f02d4a5 100755 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java @@ -217,6 +217,18 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine _sender = sender; } + @Override + public long getLastReadTime() + { + return _delegate.getLastReadTime(); + } + + @Override + public long getLastWriteTime() + { + return _delegate.getLastWriteTime(); + } + private static interface DelegateCreator { @@ -409,6 +421,18 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine } + @Override + public long getLastReadTime() + { + return 0; + } + + @Override + public long getLastWriteTime() + { + return 0; + } + public long getConnectionId() { return _id; @@ -566,5 +590,17 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine { } + + @Override + public long getLastReadTime() + { + return 0; + } + + @Override + public long getLastWriteTime() + { + return 0; + } } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java index d7d26cc772..7d263b517c 100755 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java @@ -44,6 +44,8 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol private ServerConnection _connection; private final IApplicationRegistry _appRegistry; private long _createTime = System.currentTimeMillis(); + private long _lastReadTime; + private long _lastWriteTime; public ProtocolEngine_0_10(ServerConnection conn, NetworkConnection network, @@ -71,13 +73,61 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol { _network = network; - _connection.setSender(new Disassembler(sender, MAX_FRAME_SIZE)); + _connection.setNetworkConnection(network); + _connection.setSender(new Disassembler(wrapSender(sender), MAX_FRAME_SIZE)); _connection.setPeerPrincipal(_network.getPeerPrincipal()); // FIXME Two log messages to maintain compatibility with earlier protocol versions _connection.getLogActor().message(ConnectionMessages.OPEN(null, null, null, false, false, false)); _connection.getLogActor().message(ConnectionMessages.OPEN(null, "0-10", null, false, true, false)); } + private Sender wrapSender(final Sender sender) + { + return new Sender() + { + @Override + public void setIdleTimeout(int i) + { + sender.setIdleTimeout(i); + + } + + @Override + public void send(ByteBuffer msg) + { + _lastWriteTime = System.currentTimeMillis(); + sender.send(msg); + + } + + @Override + public void flush() + { + sender.flush(); + + } + + @Override + public void close() + { + sender.close(); + + } + }; + } + + @Override + public long getLastReadTime() + { + return _lastReadTime; + } + + @Override + public long getLastWriteTime() + { + return _lastWriteTime; + } + public SocketAddress getRemoteAddress() { return _network.getRemoteAddress(); @@ -90,6 +140,7 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol public void received(final ByteBuffer buf) { + _lastReadTime = System.currentTimeMillis(); super.received(buf); _connection.receivedComplete(); } @@ -106,7 +157,7 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol public void writerIdle() { - //Todo + _connection.doHeartbeat(); } public void readerIdle() diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0.java index da70e497d6..274f206c85 100755 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0.java @@ -53,6 +53,8 @@ public class ProtocolEngine_1_0_0 implements ServerProtocolEngine, FrameOutputHa //private NetworkConnection _networkDriver; private long _readBytes; private long _writtenBytes; + private long _lastReadTime; + private long _lastWriteTime; private final IApplicationRegistry _appRegistry; private long _createTime = System.currentTimeMillis(); private ConnectionEndpoint _conn; @@ -178,6 +180,7 @@ public class ProtocolEngine_1_0_0 implements ServerProtocolEngine, FrameOutputHa public synchronized void received(ByteBuffer msg) { + _lastReadTime = System.currentTimeMillis(); if(RAW_LOGGER.isLoggable(Level.FINE)) { ByteBuffer dup = msg.duplicate(); @@ -320,6 +323,7 @@ public class ProtocolEngine_1_0_0 implements ServerProtocolEngine, FrameOutputHa synchronized(_sendLock) { + _lastWriteTime = System.currentTimeMillis(); if(FRAME_LOGGER.isLoggable(Level.FINE)) { FRAME_LOGGER.fine("SEND[" + getRemoteAddress() + "|" + amqFrame.getChannel() + "] : " + amqFrame.getFrameBody()); @@ -374,4 +378,13 @@ public class ProtocolEngine_1_0_0 implements ServerProtocolEngine, FrameOutputHa return _connectionId; } + public long getLastReadTime() + { + return _lastReadTime; + } + + public long getLastWriteTime() + { + return _lastWriteTime; + } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0_SASL.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0_SASL.java index 71d6df27e0..634c5e6255 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0_SASL.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0_SASL.java @@ -51,6 +51,9 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut { private long _readBytes; private long _writtenBytes; + + private long _lastReadTime; + private long _lastWriteTime; private final IApplicationRegistry _appRegistry; private long _createTime = System.currentTimeMillis(); private ConnectionEndpoint _conn; @@ -222,6 +225,7 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut public synchronized void received(ByteBuffer msg) { + _lastReadTime = System.currentTimeMillis(); if(RAW_LOGGER.isLoggable(Level.FINE)) { ByteBuffer dup = msg.duplicate(); @@ -364,7 +368,7 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut synchronized(_sendLock) { - + _lastWriteTime = System.currentTimeMillis(); if(FRAME_LOGGER.isLoggable(Level.FINE)) { FRAME_LOGGER.fine("SEND[" + getRemoteAddress() + "|" + amqFrame.getChannel() + "] : " + amqFrame.getFrameBody()); @@ -425,4 +429,13 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut return _connectionId; } + public long getLastReadTime() + { + return _lastReadTime; + } + + public long getLastWriteTime() + { + return _lastWriteTime; + } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java index ce3ede2dba..58de6a0cdf 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java +++ b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java @@ -48,6 +48,7 @@ import org.apache.qpid.transport.ExecutionException; import org.apache.qpid.transport.Method; import org.apache.qpid.transport.ProtocolEvent; import org.apache.qpid.transport.Session; +import org.apache.qpid.transport.network.NetworkConnection; import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CONNECTION_FORMAT; import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.SOCKET_FORMAT; @@ -68,6 +69,7 @@ public class ServerConnection extends Connection implements AMQConnectionModel, private AtomicLong _lastIoTime = new AtomicLong(); private boolean _blocking; private Principal _peerPrincipal; + private NetworkConnection _networkConnection; public ServerConnection(final long connectionId) { @@ -490,4 +492,20 @@ public class ServerConnection extends Connection implements AMQConnectionModel, { super.setLocalAddress(localAddress); } + + public void setNetworkConnection(NetworkConnection network) + { + _networkConnection = network; + } + + public NetworkConnection getNetworkConnection() + { + return _networkConnection; + } + + public void doHeartbeat() + { + super.doHeartBeat(); + + } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java index 70f5afe5ac..f48121f9f0 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java +++ b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java @@ -43,6 +43,8 @@ import org.apache.qpid.server.subscription.Subscription_0_10; import org.apache.qpid.server.virtualhost.State; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.transport.*; +import org.apache.qpid.transport.network.NetworkConnection; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -226,14 +228,18 @@ public class ServerConnectionDelegate extends ServerDelegate return; } - setConnectionTuneOkChannelMax(sconn, okChannelMax); - } + if(ok.hasHeartbeat()) + { + final int heartbeat = ok.getHeartbeat(); + if(heartbeat > 0) + { + final NetworkConnection networkConnection = sconn.getNetworkConnection(); + networkConnection.setMaxReadIdle(2 * heartbeat); + networkConnection.setMaxWriteIdle(heartbeat); + } + } - @Override - protected int getHeartbeatMax() - { - //TODO: implement broker support for actually sending heartbeats - return 0; + setConnectionTuneOkChannelMax(sconn, okChannelMax); } @Override diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java index 5dd6e55e64..a8cf947f6d 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java @@ -484,10 +484,14 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec heartbeat = Integer.getInteger(ClientProperties.IDLE_TIMEOUT_PROP_NAME)/1000; _logger.warn("JVM arg -Didle_timeout= is deprecated, please use -Dqpid.heartbeat="); } - else + else if(Integer.getInteger(ClientProperties.HEARTBEAT) != null) { heartbeat = Integer.getInteger(ClientProperties.HEARTBEAT,ClientProperties.HEARTBEAT_DEFAULT); } + else + { + heartbeat = Integer.getInteger("amqj.heartbeat.delay", ClientProperties.HEARTBEAT_DEFAULT); + } return heartbeat; } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java index 740a81b939..25af7003d0 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java @@ -122,7 +122,9 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate SecurityLayer securityLayer = SecurityLayerFactory.newInstance(settings); OutgoingNetworkTransport transport = Transport.getOutgoingTransportInstance(getProtocolVersion()); - NetworkConnection network = transport.connect(settings, securityLayer.receiver(_conn.getProtocolHandler())); + + NetworkConnection network = transport.connect(settings, securityLayer.receiver(_conn.getProtocolHandler()), + _conn.getProtocolHandler()); _conn.getProtocolHandler().setNetworkConnection(network, securityLayer.sender(network.getSender())); StateWaiter waiter = _conn.getProtocolHandler().createWaiter(openOrClosedStates); diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java index af89000c5c..04d57c9fa2 100644 --- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java @@ -178,6 +178,8 @@ public class AMQProtocolHandler implements ProtocolEngine private NetworkConnection _network; private Sender _sender; + private long _lastReadTime = System.currentTimeMillis(); + private long _lastWriteTime = System.currentTimeMillis(); /** * Creates a new protocol handler, associated with the specified client connection instance. @@ -442,6 +444,7 @@ public class AMQProtocolHandler implements ProtocolEngine public void received(ByteBuffer msg) { _readBytes += msg.remaining(); + _lastReadTime = System.currentTimeMillis(); try { final ArrayList dataBlocks = _codecFactory.getDecoder().decodeBuffer(msg); @@ -560,6 +563,7 @@ public class AMQProtocolHandler implements ProtocolEngine public synchronized void writeFrame(AMQDataBlock frame, boolean flush) { final ByteBuffer buf = asByteBuffer(frame); + _lastWriteTime = System.currentTimeMillis(); _writtenBytes += buf.remaining(); _sender.send(buf); if(flush) @@ -882,6 +886,18 @@ public class AMQProtocolHandler implements ProtocolEngine _sender = sender; } + @Override + public long getLastReadTime() + { + return _lastReadTime; + } + + @Override + public long getLastWriteTime() + { + return _lastWriteTime; + } + protected Sender getSender() { return _sender; diff --git a/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java b/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java index 7ca588946b..6774d0a45a 100644 --- a/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java +++ b/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java @@ -23,6 +23,7 @@ package org.apache.qpid.protocol; import org.apache.qpid.transport.Receiver; import org.apache.qpid.transport.Sender; import org.apache.qpid.transport.network.NetworkConnection; +import org.apache.qpid.transport.network.TransportActivity; import java.net.SocketAddress; import java.nio.ByteBuffer; @@ -31,7 +32,7 @@ import java.nio.ByteBuffer; * A ProtocolEngine is a Receiver for java.nio.ByteBuffers. It takes the data passed to it in the received * decodes it and then process the result. */ -public interface ProtocolEngine extends Receiver +public interface ProtocolEngine extends Receiver, TransportActivity { // Returns the remote address of the NetworkDriver SocketAddress getRemoteAddress(); @@ -56,6 +57,6 @@ public interface ProtocolEngine extends Receiver void readerIdle(); - public void setNetworkConnection(NetworkConnection network, Sender sender); + public void setNetworkConnection(NetworkConnection network, Sender sender); } \ No newline at end of file diff --git a/java/common/src/main/java/org/apache/qpid/transport/Connection.java b/java/common/src/main/java/org/apache/qpid/transport/Connection.java index f95a0d215b..3bff9aa346 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/Connection.java +++ b/java/common/src/main/java/org/apache/qpid/transport/Connection.java @@ -21,12 +21,7 @@ package org.apache.qpid.transport; import org.apache.qpid.framing.ProtocolVersion; -import org.apache.qpid.transport.network.Assembler; -import org.apache.qpid.transport.network.Disassembler; -import org.apache.qpid.transport.network.InputHandler; -import org.apache.qpid.transport.network.NetworkConnection; -import org.apache.qpid.transport.network.OutgoingNetworkTransport; -import org.apache.qpid.transport.network.Transport; +import org.apache.qpid.transport.network.*; import org.apache.qpid.transport.network.security.SecurityLayer; import org.apache.qpid.transport.network.security.SecurityLayerFactory; import org.apache.qpid.transport.util.Logger; @@ -73,6 +68,9 @@ public class Connection extends ConnectionInvoker //Usable channels are numbered 0 to - 1 public static final int MAX_CHANNEL_MAX = 0xFFFF; public static final int MIN_USABLE_CHANNEL_NUM = 0; + private long _lastSendTime; + private long _lastReadTime; + public enum State { NEW, CLOSED, OPENING, OPEN, CLOSING, CLOSE_RCVD, RESUMING } @@ -231,7 +229,8 @@ public class Connection extends ConnectionInvoker addConnectionListener((ConnectionListener)secureReceiver); } - NetworkConnection network = transport.connect(settings, secureReceiver); + NetworkConnection network = transport.connect(settings, secureReceiver, null); + setRemoteAddress(network.getRemoteAddress()); setLocalAddress(network.getLocalAddress()); @@ -368,6 +367,7 @@ public class Connection extends ConnectionInvoker public void received(ProtocolEvent event) { + _lastReadTime = System.currentTimeMillis(); if(log.isDebugEnabled()) { log.debug("RECV: [%s] %s", this, event); @@ -377,6 +377,7 @@ public class Connection extends ConnectionInvoker public void send(ProtocolEvent event) { + _lastSendTime = System.currentTimeMillis(); if(log.isDebugEnabled()) { log.debug("SEND: [%s] %s", this, event); @@ -745,4 +746,38 @@ public class Connection extends ConnectionInvoker sessionDetached.setCode(sessionDetachCode); invoke(sessionDetached); } + + + protected void doHeartBeat() + { + connectionHeartbeat(); + } + + private class ConnectionActivity implements TransportActivity + { + @Override + public long getLastReadTime() + { + return _lastReadTime; + } + + @Override + public long getLastWriteTime() + { + return _lastSendTime; + } + + @Override + public void writerIdle() + { + connectionHeartbeat(); + } + + @Override + public void readerIdle() + { + // TODO + + } + } } diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/IncomingNetworkTransport.java b/java/common/src/main/java/org/apache/qpid/transport/network/IncomingNetworkTransport.java index 4d4274278f..8437ef1a94 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/network/IncomingNetworkTransport.java +++ b/java/common/src/main/java/org/apache/qpid/transport/network/IncomingNetworkTransport.java @@ -27,5 +27,7 @@ import javax.net.ssl.SSLContext; public interface IncomingNetworkTransport extends NetworkTransport { - public void accept(NetworkTransportConfiguration config, ProtocolEngineFactory factory, SSLContext sslContext); + public void accept(NetworkTransportConfiguration config, + ProtocolEngineFactory factory, + SSLContext sslContext); } \ No newline at end of file diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/NetworkConnection.java b/java/common/src/main/java/org/apache/qpid/transport/network/NetworkConnection.java index 12c42d6643..050d194c47 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/network/NetworkConnection.java +++ b/java/common/src/main/java/org/apache/qpid/transport/network/NetworkConnection.java @@ -50,4 +50,8 @@ public interface NetworkConnection void setPeerPrincipal(Principal principal); Principal getPeerPrincipal(); + + int getMaxReadIdle(); + + int getMaxWriteIdle(); } diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/OutgoingNetworkTransport.java b/java/common/src/main/java/org/apache/qpid/transport/network/OutgoingNetworkTransport.java index 92774f5842..45231aa05d 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/network/OutgoingNetworkTransport.java +++ b/java/common/src/main/java/org/apache/qpid/transport/network/OutgoingNetworkTransport.java @@ -29,5 +29,7 @@ public interface OutgoingNetworkTransport extends NetworkTransport { public NetworkConnection getConnection(); - public NetworkConnection connect(ConnectionSettings settings, Receiver delegate); + public NetworkConnection connect(ConnectionSettings settings, + Receiver delegate, + TransportActivity transportActivity); } \ No newline at end of file diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/Ticker.java b/java/common/src/main/java/org/apache/qpid/transport/network/Ticker.java new file mode 100644 index 0000000000..210b014a57 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/transport/network/Ticker.java @@ -0,0 +1,29 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.transport.network; + +public interface Ticker +{ + int getTimeToNextTick(long currentTime); + + int tick(long currentTime); +} diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/TransportActivity.java b/java/common/src/main/java/org/apache/qpid/transport/network/TransportActivity.java new file mode 100644 index 0000000000..2ee336d9b2 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/transport/network/TransportActivity.java @@ -0,0 +1,33 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.transport.network; + +public interface TransportActivity +{ + long getLastReadTime(); + + long getLastWriteTime(); + + void writerIdle(); + + void readerIdle(); +} diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/io/IdleTimeoutTicker.java b/java/common/src/main/java/org/apache/qpid/transport/network/io/IdleTimeoutTicker.java new file mode 100644 index 0000000000..b8a8d42c7c --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/transport/network/io/IdleTimeoutTicker.java @@ -0,0 +1,87 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.transport.network.io; + +import org.apache.qpid.transport.network.NetworkConnection; +import org.apache.qpid.transport.network.Ticker; +import org.apache.qpid.transport.network.TransportActivity; + +class IdleTimeoutTicker implements Ticker +{ + private final TransportActivity _transport; + private final int _defaultTimeout; + private NetworkConnection _connection; + + public IdleTimeoutTicker(TransportActivity transport, int defaultTimeout) + { + _transport = transport; + _defaultTimeout = defaultTimeout; + } + + @Override + public int getTimeToNextTick(long currentTime) + { + long nextTime = -1; + final long maxReadIdle = 1000l * _connection.getMaxReadIdle(); + + if(maxReadIdle != 0) + { + nextTime = _transport.getLastReadTime() + maxReadIdle; + } + + long maxWriteIdle = 1000l * _connection.getMaxWriteIdle(); + + if(maxWriteIdle != 0) + { + long writeTime = _transport.getLastWriteTime() + maxWriteIdle; + if(nextTime == -1l || writeTime < nextTime) + { + nextTime = writeTime; + } + } + return nextTime == -1 ? _defaultTimeout : (int) (nextTime - currentTime); + } + + @Override + public int tick(long currentTime) + { + // writer Idle + long maxWriteIdle = 1000l * _connection.getMaxWriteIdle(); + if(maxWriteIdle != 0 && maxWriteIdle+ _transport.getLastWriteTime() <= currentTime) + { + _transport.writerIdle(); + } + // reader Idle + final long maxReadIdle = 1000l * _connection.getMaxReadIdle(); + if(maxReadIdle != 0 && maxReadIdle+ _transport.getLastReadTime() <= currentTime) + { + + _transport.readerIdle(); + } + return getTimeToNextTick(currentTime); + } + + public void setConnection(NetworkConnection connection) + { + _connection = connection; + } +} diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java index 2658296c5f..f5c09ac2cc 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java +++ b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java @@ -26,7 +26,9 @@ import java.nio.ByteBuffer; import java.security.Principal; import org.apache.qpid.transport.Receiver; import org.apache.qpid.transport.Sender; +import org.apache.qpid.transport.network.Ticker; import org.apache.qpid.transport.network.NetworkConnection; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,14 +40,23 @@ public class IoNetworkConnection implements NetworkConnection private final IoSender _ioSender; private final IoReceiver _ioReceiver; private Principal _principal; + private int _maxReadIdle; + private int _maxWriteIdle; public IoNetworkConnection(Socket socket, Receiver delegate, - int sendBufferSize, int receiveBufferSize, long timeout) + int sendBufferSize, int receiveBufferSize, long timeout) + { + this(socket,delegate,sendBufferSize,receiveBufferSize,timeout,null); + } + + public IoNetworkConnection(Socket socket, Receiver delegate, + int sendBufferSize, int receiveBufferSize, long timeout, Ticker ticker) { _socket = socket; _timeout = timeout; _ioReceiver = new IoReceiver(_socket, delegate, receiveBufferSize,_timeout); + _ioReceiver.setTicker(ticker); _ioSender = new IoSender(_socket, 2 * sendBufferSize, _timeout); @@ -88,14 +99,12 @@ public class IoNetworkConnection implements NetworkConnection public void setMaxWriteIdle(int sec) { - // TODO implement support for setting heartbeating config in this way - // Currently a socket timeout is used in IoSender + _maxWriteIdle = sec; } public void setMaxReadIdle(int sec) { - // TODO implement support for setting heartbeating config in this way - // Currently a socket timeout is used in IoSender + _maxReadIdle = sec; } @Override @@ -109,4 +118,16 @@ public class IoNetworkConnection implements NetworkConnection { return _principal; } + + @Override + public int getMaxReadIdle() + { + return _maxReadIdle; + } + + @Override + public int getMaxWriteIdle() + { + return _maxWriteIdle; + } } diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java index 4cb526ef73..3f1cd3519b 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java +++ b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java @@ -41,9 +41,8 @@ import org.apache.qpid.transport.ConnectionSettings; import org.apache.qpid.transport.NetworkTransportConfiguration; import org.apache.qpid.transport.Receiver; import org.apache.qpid.transport.TransportException; -import org.apache.qpid.transport.network.IncomingNetworkTransport; -import org.apache.qpid.transport.network.NetworkConnection; -import org.apache.qpid.transport.network.OutgoingNetworkTransport; +import org.apache.qpid.transport.network.*; + import org.slf4j.LoggerFactory; public class IoNetworkTransport implements OutgoingNetworkTransport, IncomingNetworkTransport @@ -56,7 +55,9 @@ public class IoNetworkTransport implements OutgoingNetworkTransport, IncomingNet private IoNetworkConnection _connection; private AcceptingThread _acceptor; - public NetworkConnection connect(ConnectionSettings settings, Receiver delegate) + public NetworkConnection connect(ConnectionSettings settings, + Receiver delegate, + TransportActivity transportActivity) { int sendBufferSize = settings.getWriteBufferSize(); int receiveBufferSize = settings.getReadBufferSize(); @@ -91,7 +92,9 @@ public class IoNetworkTransport implements OutgoingNetworkTransport, IncomingNet try { - _connection = new IoNetworkConnection(_socket, delegate, sendBufferSize, receiveBufferSize, TIMEOUT); + IdleTimeoutTicker ticker = new IdleTimeoutTicker(transportActivity, TIMEOUT); + _connection = new IoNetworkConnection(_socket, delegate, sendBufferSize, receiveBufferSize, TIMEOUT, ticker); + ticker.setConnection(_connection); _connection.start(); } catch(Exception e) @@ -128,7 +131,9 @@ public class IoNetworkTransport implements OutgoingNetworkTransport, IncomingNet return _connection; } - public void accept(NetworkTransportConfiguration config, ProtocolEngineFactory factory, SSLContext sslContext) + public void accept(NetworkTransportConfiguration config, + ProtocolEngineFactory factory, + SSLContext sslContext) { try { @@ -149,6 +154,7 @@ public class IoNetworkTransport implements OutgoingNetworkTransport, IncomingNet private ProtocolEngineFactory _factory; private SSLContext _sslContext; private ServerSocket _serverSocket; + private int _timeout; private AcceptingThread(NetworkTransportConfiguration config, ProtocolEngineFactory factory, @@ -157,6 +163,7 @@ public class IoNetworkTransport implements OutgoingNetworkTransport, IncomingNet _config = config; _factory = factory; _sslContext = sslContext; + _timeout = TIMEOUT; InetSocketAddress address = config.getAddress(); @@ -217,6 +224,7 @@ public class IoNetworkTransport implements OutgoingNetworkTransport, IncomingNet { socket = _serverSocket.accept(); socket.setTcpNoDelay(_config.getTcpNoDelay()); + socket.setSoTimeout(_timeout); final Integer sendBufferSize = _config.getSendBufferSize(); final Integer receiveBufferSize = _config.getReceiveBufferSize(); @@ -226,7 +234,10 @@ public class IoNetworkTransport implements OutgoingNetworkTransport, IncomingNet ProtocolEngine engine = _factory.newProtocolEngine(); - NetworkConnection connection = new IoNetworkConnection(socket, engine, sendBufferSize, receiveBufferSize, TIMEOUT); + final IdleTimeoutTicker ticker = new IdleTimeoutTicker(engine, TIMEOUT); + NetworkConnection connection = new IoNetworkConnection(socket, engine, sendBufferSize, receiveBufferSize, _timeout, + ticker); + ticker.setConnection(connection); if(_sslContext != null) { @@ -293,6 +304,7 @@ public class IoNetworkTransport implements OutgoingNetworkTransport, IncomingNet } } } + } } diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java index a36e5fedee..11f28a2aee 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java +++ b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java @@ -24,6 +24,7 @@ import org.apache.qpid.common.Closeable; import org.apache.qpid.thread.Threading; import org.apache.qpid.transport.Receiver; import org.apache.qpid.transport.TransportException; +import org.apache.qpid.transport.network.Ticker; import org.apache.qpid.transport.util.Logger; import javax.net.ssl.SSLSocket; @@ -31,6 +32,7 @@ import java.io.IOException; import java.io.InputStream; import java.net.Socket; import java.net.SocketException; +import java.net.SocketTimeoutException; import java.nio.ByteBuffer; import java.util.concurrent.atomic.AtomicBoolean; @@ -51,6 +53,8 @@ final class IoReceiver implements Runnable, Closeable private final AtomicBoolean closed = new AtomicBoolean(false); private final Thread receiverThread; private static final boolean shutdownBroken; + + private Ticker _ticker; static { String osName = System.getProperty("os.name"); @@ -136,7 +140,7 @@ final class IoReceiver implements Runnable, Closeable { final int threshold = bufferSize / 2; - // I set the read buffer size simillar to SO_RCVBUF + // I set the read buffer size similar to SO_RCVBUF // Haven't tested with a lower value to see if it's better or worse byte[] buffer = new byte[bufferSize]; try @@ -144,17 +148,60 @@ final class IoReceiver implements Runnable, Closeable InputStream in = socket.getInputStream(); int read = 0; int offset = 0; - while ((read = in.read(buffer, offset, bufferSize-offset)) != -1) + long currentTime; + while(read != -1) { - if (read > 0) + try + { + while ((read = in.read(buffer, offset, bufferSize-offset)) != -1) + { + if (read > 0) + { + ByteBuffer b = ByteBuffer.wrap(buffer,offset,read); + receiver.received(b); + offset+=read; + if (offset > threshold) + { + offset = 0; + buffer = new byte[bufferSize]; + } + } + currentTime = System.currentTimeMillis(); + + if(_ticker != null) + { + final int tick = _ticker.getTimeToNextTick(currentTime); + try + { + if(!socket.isClosed()) + { + socket.setSoTimeout(tick <= 0 ? 1 : tick); + } + } + catch(SocketException e) + { + // ignore - closed socket + } + } + } + } + catch (SocketTimeoutException e) { - ByteBuffer b = ByteBuffer.wrap(buffer,offset,read); - receiver.received(b); - offset+=read; - if (offset > threshold) + currentTime = System.currentTimeMillis(); + if(_ticker != null) { - offset = 0; - buffer = new byte[bufferSize]; + final int tick = _ticker.tick(currentTime); + if(!socket.isClosed()) + { + try + { + socket.setSoTimeout(tick <= 0 ? 1 : tick ); + } + catch(SocketException ex) + { + // ignore - closed socket + } + } } } } @@ -195,4 +242,15 @@ final class IoReceiver implements Runnable, Closeable return !brokenClose && !sslSocketClosed; } + public Ticker getTicker() + { + return _ticker; + } + + public void setTicker(Ticker ticker) + { + _ticker = ticker; + } + + } diff --git a/java/common/src/test/java/org/apache/qpid/transport/TestNetworkConnection.java b/java/common/src/test/java/org/apache/qpid/transport/TestNetworkConnection.java index 893f66c5ff..a19c2e7e43 100644 --- a/java/common/src/test/java/org/apache/qpid/transport/TestNetworkConnection.java +++ b/java/common/src/test/java/org/apache/qpid/transport/TestNetworkConnection.java @@ -83,6 +83,18 @@ public class TestNetworkConnection implements NetworkConnection return null; } + @Override + public int getMaxReadIdle() + { + return 0; + } + + @Override + public int getMaxWriteIdle() + { + return 0; + } + public void setMaxWriteIdle(int idleTime) { diff --git a/java/common/src/test/java/org/apache/qpid/transport/network/TransportTest.java b/java/common/src/test/java/org/apache/qpid/transport/network/TransportTest.java index b4c0981131..bf9a5843d6 100644 --- a/java/common/src/test/java/org/apache/qpid/transport/network/TransportTest.java +++ b/java/common/src/test/java/org/apache/qpid/transport/network/TransportTest.java @@ -127,7 +127,9 @@ public class TransportTest extends QpidTestCase throw new UnsupportedOperationException(); } - public NetworkConnection connect(ConnectionSettings settings, Receiver delegate) + public NetworkConnection connect(ConnectionSettings settings, + Receiver delegate, + TransportActivity transportActivity) { throw new UnsupportedOperationException(); } @@ -147,7 +149,7 @@ public class TransportTest extends QpidTestCase } public void accept(NetworkTransportConfiguration config, - ProtocolEngineFactory factory, SSLContext sslContext) + ProtocolEngineFactory factory, SSLContext sslContext) { throw new UnsupportedOperationException(); } diff --git a/java/common/src/test/java/org/apache/qpid/transport/network/io/IdleTimeoutTickerTest.java b/java/common/src/test/java/org/apache/qpid/transport/network/io/IdleTimeoutTickerTest.java new file mode 100644 index 0000000000..5cdd7a8597 --- /dev/null +++ b/java/common/src/test/java/org/apache/qpid/transport/network/io/IdleTimeoutTickerTest.java @@ -0,0 +1,257 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.transport.network.io; + +import junit.framework.TestCase; + +import java.net.SocketAddress; +import java.nio.ByteBuffer; +import java.security.Principal; + +import org.apache.qpid.test.utils.QpidTestCase; +import org.apache.qpid.transport.Sender; +import org.apache.qpid.transport.network.NetworkConnection; +import org.apache.qpid.transport.network.TransportActivity; + +public class IdleTimeoutTickerTest extends TestCase implements TransportActivity, NetworkConnection +{ + private IdleTimeoutTicker _ticker; + private static final int DEFAULT_TIMEOUT = 567890; + private long _lastReadTime; + private long _lastWriteTime; + private long _currentTime; + private int _maxWriteIdle; + private int _maxReadIdle; + private boolean _readerIdle; + private boolean _writerIdle; + + @Override + public void setUp() throws Exception + { + super.setUp(); + _ticker = new IdleTimeoutTicker(this, DEFAULT_TIMEOUT); + _ticker.setConnection(this); + _readerIdle = false; + _writerIdle = false; + _lastReadTime = 0l; + _lastWriteTime = 0l; + _maxReadIdle = 0; + _maxWriteIdle = 0; + } + + public void testNoIdle() throws Exception + { + _maxReadIdle = 4; + _maxWriteIdle = 2; + _lastReadTime = 0; + _lastWriteTime = 1500; + _currentTime = 3000; + // Current time = 3s, + // last read = 0s, max read idle = 4s, should check in 1s + // last write = 1.5s, max write idle = 2s, should check in 0.5s + long nextTime = _ticker.tick(_currentTime); + assertEquals("Incorrect next tick calculation", 500l, nextTime); + assertFalse("Incorrectly caused reader idle", _readerIdle); + assertFalse("Incorrectly caused writer idle", _writerIdle); + + + // Current time = 3.4s, + // last read = 0s, max read idle = 4s, should check in 0.6s + // last write = 3.1s, max write idle = 2s, should check in 1.7s + _lastWriteTime = 3100; + _currentTime = 3400; + nextTime = _ticker.tick(_currentTime); + assertEquals("Incorrect next tick calculation", 600l, nextTime); + assertFalse("Incorrectly caused reader idle", _readerIdle); + assertFalse("Incorrectly caused writer idle", _writerIdle); + + _maxReadIdle = 0; + nextTime = _ticker.tick(_currentTime); + assertEquals("Incorrect next tick calculation", 1700l, nextTime); + assertFalse("Incorrectly caused reader idle", _readerIdle); + assertFalse("Incorrectly caused writer idle", _writerIdle); + + _maxWriteIdle = 0; + nextTime = _ticker.tick(_currentTime); + assertEquals("Incorrect next tick calculation", DEFAULT_TIMEOUT, nextTime); + assertFalse("Incorrectly caused reader idle", _readerIdle); + assertFalse("Incorrectly caused writer idle", _writerIdle); + + } + + public void testReaderIdle() throws Exception + { + _maxReadIdle = 4; + _maxWriteIdle = 0; + _lastReadTime = 0; + _lastWriteTime = 2500; + _currentTime = 4000; + // Current time = 4s, + // last read = 0s, max read idle = 4s, reader idle + long nextTime = _ticker.tick(_currentTime); + + assertTrue(_readerIdle); + assertFalse(_writerIdle); + + _readerIdle = false; + + // last write = 2.5s, max write idle = 2s, should check in 0.5s + _maxWriteIdle = 2; + nextTime = _ticker.tick(_currentTime); + assertTrue(_readerIdle); + assertFalse(_writerIdle); + + _readerIdle = false; + // last write = 1.5s, max write idle = 2s, should check in 0.5s + + _lastWriteTime = 1500; + nextTime = _ticker.tick(_currentTime); + + assertTrue(_readerIdle); + assertTrue(_writerIdle); + + } + + public void testWriterIdle() throws Exception + { + _maxReadIdle = 0; + _maxWriteIdle = 2; + _lastReadTime = 0; + _lastWriteTime = 1500; + _currentTime = 4000; + // Current time = 4s, + // last write = 1.5s, max write idle = 2s, writer idle + long nextTime = _ticker.tick(_currentTime); + + assertTrue(_writerIdle); + assertFalse(_readerIdle); + assertEquals(2000l,nextTime); + + _writerIdle = false; + _lastWriteTime = 1500; + _maxReadIdle = 5; + + nextTime = _ticker.tick(_currentTime); + + assertTrue(_writerIdle); + assertFalse(_readerIdle); + assertEquals(1000l,nextTime); + + } + + //------------------------------------------------------------------------- + // Implement TransportActivity methods + //------------------------------------------------------------------------- + + @Override + public long getLastReadTime() + { + return _lastReadTime; + } + + @Override + public long getLastWriteTime() + { + return _lastWriteTime; + } + + @Override + public void writerIdle() + { + _writerIdle = true; + _lastWriteTime = _currentTime; + } + + @Override + public void readerIdle() + { + _readerIdle = true; + } + + //------------------------------------------------------------------------- + // Implement NetworkConnection methods + // Only actually use those relating to idle timeouts + //------------------------------------------------------------------------- + + @Override + public Sender getSender() + { + return null; + } + + @Override + public void start() + { + } + + @Override + public void close() + { + } + + @Override + public SocketAddress getRemoteAddress() + { + return null; + } + + @Override + public SocketAddress getLocalAddress() + { + return null; + } + + @Override + public void setMaxWriteIdle(int sec) + { + _maxWriteIdle = sec; + } + + @Override + public void setMaxReadIdle(int sec) + { + _maxReadIdle = sec; + } + + @Override + public void setPeerPrincipal(Principal principal) + { + } + + @Override + public Principal getPeerPrincipal() + { + return null; + } + + @Override + public int getMaxReadIdle() + { + return _maxReadIdle; + } + + @Override + public int getMaxWriteIdle() + { + return _maxWriteIdle; + } +} -- cgit v1.2.1