summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
Diffstat (limited to 'java')
-rw-r--r--java/client/src/org/apache/qpid/client/protocol/AMQProtocolSession.java17
1 files changed, 16 insertions, 1 deletions
diff --git a/java/client/src/org/apache/qpid/client/protocol/AMQProtocolSession.java b/java/client/src/org/apache/qpid/client/protocol/AMQProtocolSession.java
index b181490fdd..acb60f63ae 100644
--- a/java/client/src/org/apache/qpid/client/protocol/AMQProtocolSession.java
+++ b/java/client/src/org/apache/qpid/client/protocol/AMQProtocolSession.java
@@ -47,6 +47,9 @@ import java.util.concurrent.ConcurrentMap;
*/
public class AMQProtocolSession implements ProtocolVersionList
{
+
+ private static final int LAST_WRITE_FUTURE_JOIN_TIMEOUT = 1000 * 60 * 2;
+
private static final Logger _logger = Logger.getLogger(AMQProtocolSession.class);
public static final String PROTOCOL_INITIATION_RECEIVED = "ProtocolInitiatiionReceived";
@@ -59,6 +62,8 @@ public class AMQProtocolSession implements ProtocolVersionList
private final IoSession _minaProtocolSession;
+ private WriteFuture _lastWriteFuture;
+
/**
* The handler from which this session was created and which is used to handle protocol events.
* We send failover events to the handler.
@@ -255,7 +260,7 @@ public class AMQProtocolSession implements ProtocolVersionList
*/
public void writeFrame(AMQDataBlock frame)
{
- _minaProtocolSession.write(frame);
+ writeFrame(frame, false);
}
public void writeFrame(AMQDataBlock frame, boolean wait)
@@ -265,6 +270,10 @@ public class AMQProtocolSession implements ProtocolVersionList
{
f.join();
}
+ else
+ {
+ _lastWriteFuture = f;
+ }
}
public void addSessionByChannel(int channelId, AMQSession session)
@@ -342,6 +351,12 @@ public class AMQProtocolSession implements ProtocolVersionList
public void closeProtocolSession()
{
+ _logger.debug("Waiting for last write to join.");
+ if (_lastWriteFuture != null)
+ {
+ _lastWriteFuture.join(LAST_WRITE_FUTURE_JOIN_TIMEOUT);
+ }
+
_logger.debug("Closing protocol session");
final CloseFuture future = _minaProtocolSession.close();
future.join();