From 3d4b8c94a5498cb8a1209d376abe79973570f44b Mon Sep 17 00:00:00 2001 From: Robert Godfrey Date: Fri, 19 Oct 2012 20:59:40 +0000 Subject: QPID-4381 : add heartbeating to the AMQP 1.0 java client git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1400284 13f79535-47bb-0310-9956-ffa450edef68 --- .../apache/qpid/amqp_1_0/codec/FrameWriter.java | 27 +++++++++++++++++----- .../qpid/amqp_1_0/framing/ConnectionHandler.java | 23 +++++++++++++++++- .../amqp_1_0/transport/ConnectionEndpoint.java | 13 +++++++++++ 3 files changed, 56 insertions(+), 7 deletions(-) (limited to 'qpid/java') diff --git a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/codec/FrameWriter.java b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/codec/FrameWriter.java index dbf9306366..95e327852b 100644 --- a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/codec/FrameWriter.java +++ b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/codec/FrameWriter.java @@ -77,13 +77,21 @@ public class FrameWriter implements ValueWriter { case SIZE_0: - _typeWriter.setValue(_frame.getFrameBody()); - int payloadLength = _payload == null ? 0 : _payload.remaining(); - _size = _typeWriter.writeToBuffer(remaining > 8 - ? (ByteBuffer)buffer.duplicate().position(buffer.position()+8) - : ByteBuffer.wrap(EMPTY_BYTE_ARRAY)) + 8 + payloadLength; + if(_typeWriter!=null) + { + _typeWriter.setValue(_frame.getFrameBody()); + + + _size = _typeWriter.writeToBuffer(remaining > 8 + ? (ByteBuffer)buffer.duplicate().position(buffer.position()+8) + : ByteBuffer.wrap(EMPTY_BYTE_ARRAY)) + 8 + payloadLength; + } + else + { + _size = 8 + payloadLength; + } if(remaining >= 4) { buffer.putInt(_size); @@ -239,7 +247,14 @@ public class FrameWriter implements ValueWriter _size = -1; _payload = null; final Object frameBody = frame.getFrameBody(); - _typeWriter = _registry.getValueWriter(frameBody); + if(frameBody!=null) + { + _typeWriter = _registry.getValueWriter(frameBody); + } + else + { + _typeWriter = null; + } _payload = frame.getPayload() == null ? null : frame.getPayload().duplicate(); } } diff --git a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/ConnectionHandler.java b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/ConnectionHandler.java index 78bed8a71e..f4cd06f3ef 100644 --- a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/ConnectionHandler.java +++ b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/ConnectionHandler.java @@ -103,6 +103,7 @@ public class ConnectionHandler private boolean _setForClose; private boolean _closed; + private long _nextHeartbeat; public FrameOutput(final ConnectionEndpoint conn) { @@ -165,14 +166,34 @@ public class ConnectionHandler { synchronized(_conn.getLock()) { + long time = System.currentTimeMillis(); try { AMQFrame frame = null; while(!closed() && (frame = _queue.poll()) == null && wait) { - _conn.getLock().wait(); + _conn.getLock().wait(_conn.getIdleTimeout()/2); + + if(_conn.getIdleTimeout()>0) + { + time = System.currentTimeMillis(); + + if(frame == null && time > _nextHeartbeat) + { + frame = new TransportFrame((short) 0,null); + break; + } + } } + + + + if(frame != null) + { + _nextHeartbeat = time + _conn.getIdleTimeout()/2; + + } if(frame == _endOfFrameMarker) { _closed = true; diff --git a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java index 70e990d92e..17bc2caf5f 100644 --- a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java +++ b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java @@ -81,6 +81,8 @@ public class ConnectionEndpoint implements DescribedTypeConstructorRegistry.Sour private boolean _closedForInput; private boolean _closedForOutput; + private long _idleTimeout; + private AMQPDescribedTypeRegistry _describedTypeRegistry = AMQPDescribedTypeRegistry.newInstance() .registerTransportLayer() .registerMessagingLayer() @@ -282,6 +284,11 @@ public class ConnectionEndpoint implements DescribedTypeConstructorRegistry.Sour _remoteContainerId = open.getContainerId(); + if(open.getIdleTimeOut() != null) + { + _idleTimeout = open.getIdleTimeOut().longValue(); + } + switch(_state) { case UNOPENED: @@ -316,6 +323,7 @@ public class ConnectionEndpoint implements DescribedTypeConstructorRegistry.Sour sendClose(new Close()); break; case CLOSE_SENT: + default: } } @@ -650,6 +658,11 @@ public class ConnectionEndpoint implements DescribedTypeConstructorRegistry.Sour return this; } + public synchronized long getIdleTimeout() + { + return _idleTimeout; + } + public synchronized void close() { switch(_state) -- cgit v1.2.1