diff options
Diffstat (limited to 'java/common/src')
| -rw-r--r-- | java/common/src/main/java/org/apache/qpidity/transport/network/mina/MinaSender.java | 22 |
1 files changed, 20 insertions, 2 deletions
diff --git a/java/common/src/main/java/org/apache/qpidity/transport/network/mina/MinaSender.java b/java/common/src/main/java/org/apache/qpidity/transport/network/mina/MinaSender.java index 54e9ec28ef..c9032554fa 100644 --- a/java/common/src/main/java/org/apache/qpidity/transport/network/mina/MinaSender.java +++ b/java/common/src/main/java/org/apache/qpidity/transport/network/mina/MinaSender.java @@ -21,7 +21,9 @@ package org.apache.qpidity.transport.network.mina; import org.apache.mina.common.ByteBuffer; +import org.apache.mina.common.CloseFuture; import org.apache.mina.common.IoSession; +import org.apache.mina.common.WriteFuture; import org.apache.qpidity.transport.Sender; @@ -33,8 +35,11 @@ import org.apache.qpidity.transport.Sender; public class MinaSender implements Sender<java.nio.ByteBuffer> { + private static final int TIMEOUT = 2*60*1000; private final IoSession session; + private final Object lock = new Object(); + private WriteFuture lastWrite = null; public MinaSender(IoSession session) { @@ -43,12 +48,25 @@ public class MinaSender implements Sender<java.nio.ByteBuffer> public void send(java.nio.ByteBuffer buf) { - session.write(ByteBuffer.wrap(buf)); + synchronized (lock) + { + lastWrite = session.write(ByteBuffer.wrap(buf)); + } } public void close() { - session.close(); + // MINA will sometimes throw away in-progress writes when you + // ask it to close + synchronized (lock) + { + if (lastWrite != null) + { + lastWrite.join(); + } + } + CloseFuture closed = session.close(); + closed.join(); } } |
