diff options
| author | Rafael H. Schloming <rhs@apache.org> | 2008-07-31 20:31:28 +0000 |
|---|---|---|
| committer | Rafael H. Schloming <rhs@apache.org> | 2008-07-31 20:31:28 +0000 |
| commit | 9cd756275e59b44042664d4cf433c3d68d129a30 (patch) | |
| tree | 906299079d1a0e4957c36894f6bb2edee5074e1b /java/common/src | |
| parent | ce7ef65d0477f8b02b51839e2ea847709e64fae8 (diff) | |
| download | qpid-python-9cd756275e59b44042664d4cf433c3d68d129a30.tar.gz | |
QPID-1207: fixed io transport close to ensure threads shutdown properly
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@681474 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/common/src')
3 files changed, 57 insertions, 23 deletions
diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java index 5e64bdfba1..8ee2638729 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java +++ b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java @@ -28,6 +28,7 @@ import java.io.InputStream; import java.io.IOException; import java.net.Socket; import java.nio.ByteBuffer; +import java.util.concurrent.atomic.AtomicBoolean; /** * IoReceiver @@ -43,18 +44,50 @@ final class IoReceiver extends Thread private final Receiver<ByteBuffer> receiver; private final int bufferSize; private final Socket socket; + private final long timeout; + private final AtomicBoolean closed = new AtomicBoolean(false); - public IoReceiver(IoTransport transport, Receiver<ByteBuffer> receiver, int bufferSize) + public IoReceiver(IoTransport transport, Receiver<ByteBuffer> receiver, + int bufferSize, long timeout) { this.transport = transport; this.receiver = receiver; this.bufferSize = bufferSize; this.socket = transport.getSocket(); + this.timeout = timeout; - setName(String.format("IoReceive - %s", socket.getRemoteSocketAddress())); + setDaemon(true); + setName(String.format("IoReceiver - %s", socket.getRemoteSocketAddress())); start(); } + void close() + { + if (!closed.getAndSet(true)) + { + try + { + socket.shutdownInput(); + if (Thread.currentThread() != this) + { + join(timeout); + if (isAlive()) + { + throw new TransportException("join timed out"); + } + } + } + catch (InterruptedException e) + { + throw new TransportException(e); + } + catch (IOException e) + { + throw new TransportException(e); + } + } + } + public void run() { final int threshold = bufferSize / 2; @@ -67,7 +100,7 @@ final class IoReceiver extends Thread InputStream in = socket.getInputStream(); int read = 0; int offset = 0; - while ((read = in.read(buffer, offset, bufferSize-offset)) != -1) + while (!closed.get() && (read = in.read(buffer, offset, bufferSize-offset)) != -1) { if (read > 0) { @@ -84,22 +117,12 @@ final class IoReceiver extends Thread } catch (Throwable t) { - receiver.exception(new TransportException("error in read thread", t)); + receiver.exception(t); } finally { - try - { - transport.getSender().close(); - } - catch (TransportException e) - { - log.error(e, "error closing"); - } - finally - { - receiver.closed(); - } + receiver.closed(); + transport.getSender().close(); } } diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java index 591b4bba3e..37910ade0d 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java +++ b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java @@ -24,7 +24,6 @@ import java.net.Socket; import java.nio.ByteBuffer; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; import org.apache.qpid.transport.Sender; import org.apache.qpid.transport.TransportException; @@ -55,7 +54,7 @@ final class IoSender extends Thread implements Sender<ByteBuffer> private final Object notEmpty = new Object(); private final AtomicBoolean closed = new AtomicBoolean(false); - private IOException exception = null; + private volatile Throwable exception = null; public IoSender(IoTransport transport, int bufferSize, long timeout) @@ -74,6 +73,7 @@ final class IoSender extends Thread implements Sender<ByteBuffer> throw new TransportException("Error getting output stream for socket", e); } + setDaemon(true); setName(String.format("IoSender - %s", socket.getRemoteSocketAddress())); start(); } @@ -159,6 +159,11 @@ final class IoSender extends Thread implements Sender<ByteBuffer> public void close() { + close(true); + } + + void close(boolean reportException) + { if (!closed.getAndSet(true)) { synchronized (notEmpty) @@ -168,11 +173,15 @@ final class IoSender extends Thread implements Sender<ByteBuffer> try { - join(timeout); - if (isAlive()) + if (Thread.currentThread() != this) { - throw new TransportException("join timed out"); + join(timeout); + if (isAlive()) + { + throw new TransportException("join timed out"); + } } + transport.getReceiver().close(); socket.close(); } catch (InterruptedException e) @@ -184,7 +193,7 @@ final class IoSender extends Thread implements Sender<ByteBuffer> throw new TransportException(e); } - if (exception != null) + if (reportException && exception != null) { throw new TransportException(exception); } @@ -246,6 +255,7 @@ final class IoSender extends Thread implements Sender<ByteBuffer> { log.error(e, "error in write thread"); exception = e; + close(false); break; } tail.getAndAdd(length); diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java index 2f05cf5e55..52accb6b97 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java +++ b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java @@ -106,7 +106,8 @@ public final class IoTransport Connection conn = new Connection (new Disassembler(new OutputHandler(sender), 64*1024 - 1), delegate); - receiver = new IoReceiver(this, new InputHandler(new Assembler(conn)), 2*readBufferSize); + receiver = new IoReceiver(this, new InputHandler(new Assembler(conn)), + 2*readBufferSize, timeout); return conn; } |
