diff options
Diffstat (limited to 'java')
| -rw-r--r-- | java/client/src/org/apache/qpid/client/protocol/AMQProtocolSession.java | 17 |
1 files changed, 16 insertions, 1 deletions
diff --git a/java/client/src/org/apache/qpid/client/protocol/AMQProtocolSession.java b/java/client/src/org/apache/qpid/client/protocol/AMQProtocolSession.java index b181490fdd..acb60f63ae 100644 --- a/java/client/src/org/apache/qpid/client/protocol/AMQProtocolSession.java +++ b/java/client/src/org/apache/qpid/client/protocol/AMQProtocolSession.java @@ -47,6 +47,9 @@ import java.util.concurrent.ConcurrentMap; */ public class AMQProtocolSession implements ProtocolVersionList { + + private static final int LAST_WRITE_FUTURE_JOIN_TIMEOUT = 1000 * 60 * 2; + private static final Logger _logger = Logger.getLogger(AMQProtocolSession.class); public static final String PROTOCOL_INITIATION_RECEIVED = "ProtocolInitiatiionReceived"; @@ -59,6 +62,8 @@ public class AMQProtocolSession implements ProtocolVersionList private final IoSession _minaProtocolSession; + private WriteFuture _lastWriteFuture; + /** * The handler from which this session was created and which is used to handle protocol events. * We send failover events to the handler. @@ -255,7 +260,7 @@ public class AMQProtocolSession implements ProtocolVersionList */ public void writeFrame(AMQDataBlock frame) { - _minaProtocolSession.write(frame); + writeFrame(frame, false); } public void writeFrame(AMQDataBlock frame, boolean wait) @@ -265,6 +270,10 @@ public class AMQProtocolSession implements ProtocolVersionList { f.join(); } + else + { + _lastWriteFuture = f; + } } public void addSessionByChannel(int channelId, AMQSession session) @@ -342,6 +351,12 @@ public class AMQProtocolSession implements ProtocolVersionList public void closeProtocolSession() { + _logger.debug("Waiting for last write to join."); + if (_lastWriteFuture != null) + { + _lastWriteFuture.join(LAST_WRITE_FUTURE_JOIN_TIMEOUT); + } + _logger.debug("Closing protocol session"); final CloseFuture future = _minaProtocolSession.close(); future.join(); |
