summaryrefslogtreecommitdiff
path: root/java/client
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2006-10-20 11:48:50 +0000
committerMartin Ritchie <ritchiem@apache.org>2006-10-20 11:48:50 +0000
commit99eff7255998614695fb956f5904eb7c6c56a385 (patch)
tree181d14e527afdf7665a934185e2ffea853556f71 /java/client
parentceaf51ea03c17562a65bab1235dd7b11a36a23d1 (diff)
downloadqpid-python-99eff7255998614695fb956f5904eb7c6c56a385.tar.gz
Modified to maintain a reference to the lastWriteFuture. This is then used when closing the ProtocolSession to join on so that we can be sure all data has been written to the broker. A time out of 2 minutes ensures that the client doesn't hang for ever if the broker fails.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@466078 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client')
-rw-r--r--java/client/src/org/apache/qpid/client/protocol/AMQProtocolSession.java17
1 files changed, 16 insertions, 1 deletions
diff --git a/java/client/src/org/apache/qpid/client/protocol/AMQProtocolSession.java b/java/client/src/org/apache/qpid/client/protocol/AMQProtocolSession.java
index b181490fdd..acb60f63ae 100644
--- a/java/client/src/org/apache/qpid/client/protocol/AMQProtocolSession.java
+++ b/java/client/src/org/apache/qpid/client/protocol/AMQProtocolSession.java
@@ -47,6 +47,9 @@ import java.util.concurrent.ConcurrentMap;
*/
public class AMQProtocolSession implements ProtocolVersionList
{
+
+ private static final int LAST_WRITE_FUTURE_JOIN_TIMEOUT = 1000 * 60 * 2;
+
private static final Logger _logger = Logger.getLogger(AMQProtocolSession.class);
public static final String PROTOCOL_INITIATION_RECEIVED = "ProtocolInitiatiionReceived";
@@ -59,6 +62,8 @@ public class AMQProtocolSession implements ProtocolVersionList
private final IoSession _minaProtocolSession;
+ private WriteFuture _lastWriteFuture;
+
/**
* The handler from which this session was created and which is used to handle protocol events.
* We send failover events to the handler.
@@ -255,7 +260,7 @@ public class AMQProtocolSession implements ProtocolVersionList
*/
public void writeFrame(AMQDataBlock frame)
{
- _minaProtocolSession.write(frame);
+ writeFrame(frame, false);
}
public void writeFrame(AMQDataBlock frame, boolean wait)
@@ -265,6 +270,10 @@ public class AMQProtocolSession implements ProtocolVersionList
{
f.join();
}
+ else
+ {
+ _lastWriteFuture = f;
+ }
}
public void addSessionByChannel(int channelId, AMQSession session)
@@ -342,6 +351,12 @@ public class AMQProtocolSession implements ProtocolVersionList
public void closeProtocolSession()
{
+ _logger.debug("Waiting for last write to join.");
+ if (_lastWriteFuture != null)
+ {
+ _lastWriteFuture.join(LAST_WRITE_FUTURE_JOIN_TIMEOUT);
+ }
+
_logger.debug("Closing protocol session");
final CloseFuture future = _minaProtocolSession.close();
future.join();