diff options
| author | Martin Ritchie <ritchiem@apache.org> | 2006-10-20 11:48:50 +0000 |
|---|---|---|
| committer | Martin Ritchie <ritchiem@apache.org> | 2006-10-20 11:48:50 +0000 |
| commit | 99eff7255998614695fb956f5904eb7c6c56a385 (patch) | |
| tree | 181d14e527afdf7665a934185e2ffea853556f71 /java/client/src | |
| parent | ceaf51ea03c17562a65bab1235dd7b11a36a23d1 (diff) | |
| download | qpid-python-99eff7255998614695fb956f5904eb7c6c56a385.tar.gz | |
Modified to maintain a reference to the lastWriteFuture. This is then used when closing the ProtocolSession to join on so that we can be sure all data has been written to the broker. A time out of 2 minutes ensures that the client doesn't hang for ever if the broker fails.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@466078 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client/src')
| -rw-r--r-- | java/client/src/org/apache/qpid/client/protocol/AMQProtocolSession.java | 17 |
1 files changed, 16 insertions, 1 deletions
diff --git a/java/client/src/org/apache/qpid/client/protocol/AMQProtocolSession.java b/java/client/src/org/apache/qpid/client/protocol/AMQProtocolSession.java index b181490fdd..acb60f63ae 100644 --- a/java/client/src/org/apache/qpid/client/protocol/AMQProtocolSession.java +++ b/java/client/src/org/apache/qpid/client/protocol/AMQProtocolSession.java @@ -47,6 +47,9 @@ import java.util.concurrent.ConcurrentMap; */ public class AMQProtocolSession implements ProtocolVersionList { + + private static final int LAST_WRITE_FUTURE_JOIN_TIMEOUT = 1000 * 60 * 2; + private static final Logger _logger = Logger.getLogger(AMQProtocolSession.class); public static final String PROTOCOL_INITIATION_RECEIVED = "ProtocolInitiatiionReceived"; @@ -59,6 +62,8 @@ public class AMQProtocolSession implements ProtocolVersionList private final IoSession _minaProtocolSession; + private WriteFuture _lastWriteFuture; + /** * The handler from which this session was created and which is used to handle protocol events. * We send failover events to the handler. @@ -255,7 +260,7 @@ public class AMQProtocolSession implements ProtocolVersionList */ public void writeFrame(AMQDataBlock frame) { - _minaProtocolSession.write(frame); + writeFrame(frame, false); } public void writeFrame(AMQDataBlock frame, boolean wait) @@ -265,6 +270,10 @@ public class AMQProtocolSession implements ProtocolVersionList { f.join(); } + else + { + _lastWriteFuture = f; + } } public void addSessionByChannel(int channelId, AMQSession session) @@ -342,6 +351,12 @@ public class AMQProtocolSession implements ProtocolVersionList public void closeProtocolSession() { + _logger.debug("Waiting for last write to join."); + if (_lastWriteFuture != null) + { + _lastWriteFuture.join(LAST_WRITE_FUTURE_JOIN_TIMEOUT); + } + _logger.debug("Closing protocol session"); final CloseFuture future = _minaProtocolSession.close(); future.join(); |
