summaryrefslogtreecommitdiff
path: root/qpid/java/common/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/common/src/main')
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingConnection.java27
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java244
2 files changed, 254 insertions, 17 deletions
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingConnection.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingConnection.java
index c848ba7a16..468c8ba359 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingConnection.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingConnection.java
@@ -41,8 +41,7 @@ public class NonBlockingConnection implements NetworkConnection
private static final Logger LOGGER = LoggerFactory.getLogger(NonBlockingConnection.class);
private final SocketChannel _socket;
private final long _timeout;
- private final NonBlockingSender _ioSender;
- private final NonBlockingReceiver _ioReceiver;
+ private final NonBlockingSenderReceiver _nonBlockingSenderReceiver;
private int _maxReadIdle;
private int _maxWriteIdle;
private Principal _principal;
@@ -55,36 +54,30 @@ public class NonBlockingConnection implements NetworkConnection
_socket = socket;
_timeout = timeout;
- _ioReceiver = new NonBlockingReceiver(_socket, delegate, receiveBufferSize,_timeout);
- _ioReceiver.setTicker(ticker);
+// _ioReceiver = new NonBlockingReceiver(_socket, delegate, receiveBufferSize,_timeout);
+// _nonBlockingSenderReceiver.setTicker(ticker);
- _ioSender = new NonBlockingSender(_socket, 2 * sendBufferSize, _timeout);
+// _ioSender = new NonBlockingSender(_socket, 2 * sendBufferSize, _timeout);
- _ioSender.setReceiver(_ioReceiver);
+// _ioSender.setReceiver(_nonBlockingSenderReceiver);
+
+ _nonBlockingSenderReceiver = new NonBlockingSenderReceiver(_socket, delegate, receiveBufferSize, ticker);
}
public void start()
{
- _ioSender.initiate();
- _ioReceiver.initiate();
+ _nonBlockingSenderReceiver.initiate();
}
public Sender<ByteBuffer> getSender()
{
- return _ioSender;
+ return _nonBlockingSenderReceiver;
}
public void close()
{
- try
- {
- _ioSender.close();
- }
- finally
- {
- _ioReceiver.close(false);
- }
+ _nonBlockingSenderReceiver.close();
}
public SocketAddress getRemoteAddress()
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java
new file mode 100644
index 0000000000..df1ffac6b7
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpid.transport.network.io;
+
+import java.io.IOException;
+import java.net.SocketException;
+import java.net.SocketTimeoutException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.SocketChannel;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.qpid.thread.Threading;
+import org.apache.qpid.transport.Receiver;
+import org.apache.qpid.transport.Sender;
+import org.apache.qpid.transport.SenderException;
+import org.apache.qpid.transport.network.Ticker;
+
+public class NonBlockingSenderReceiver implements Runnable, Sender<ByteBuffer>
+{
+ private static final Logger LOGGER = LoggerFactory.getLogger(NonBlockingSenderReceiver.class);
+
+ private final SocketChannel _socketChannel;
+ private final Selector _selector;
+
+ private final ConcurrentLinkedQueue<ByteBuffer> _buffers = new ConcurrentLinkedQueue<>();
+
+ private final Thread _ioThread;
+ private final String _remoteSocketAddress;
+ private final AtomicBoolean _closed = new AtomicBoolean(false);
+ private final Receiver<ByteBuffer> _receiver;
+ private final int _receiveBufSize;
+ private final Ticker _ticker;
+
+
+
+ public NonBlockingSenderReceiver(final SocketChannel socketChannel, Receiver<ByteBuffer> receiver, int receiveBufSize, Ticker ticker)
+ {
+ _socketChannel = socketChannel;
+ _receiver = receiver;
+ _receiveBufSize = receiveBufSize;
+ _ticker = ticker;
+
+ try
+ {
+ _remoteSocketAddress = socketChannel.getRemoteAddress().toString();
+ _socketChannel.configureBlocking(false);
+ _selector = Selector.open();
+ _socketChannel.register(_selector, SelectionKey.OP_READ);
+ }
+ catch (IOException e)
+ {
+ throw new SenderException("Unable to prepare the channel for non-blocking IO", e);
+ }
+ try
+ {
+ //Create but deliberately don't start the thread.
+ _ioThread = Threading.getThreadFactory().createThread(this);
+ }
+ catch(Exception e)
+ {
+ throw new SenderException("Error creating SenderReceiver thread for " + _remoteSocketAddress, e);
+ }
+
+ _ioThread.setDaemon(true);
+ _ioThread.setName(String.format("IoSenderReceiver - %s", _remoteSocketAddress));
+
+ }
+
+ public void initiate()
+ {
+ _ioThread.start();
+ }
+
+ @Override
+ public void setIdleTimeout(final int i)
+ {
+ // Probably unused - dead code to be removed??
+ }
+
+ @Override
+ public void send(final ByteBuffer msg)
+ {
+ // append to list and do selector wakeup
+ _buffers.add(msg);
+ _selector.wakeup();
+ }
+
+ @Override
+ public void run()
+ {
+ // never ending loop doing
+ // try to write all pending byte buffers, handle situation where zero bytes or part of a byte buffer is written
+ // read as much as you can
+ // try to write all pending byte buffers
+
+ while (!_closed.get())
+ {
+
+ try
+ {
+ long currentTime = System.currentTimeMillis();
+ int tick = _ticker.getTimeToNextTick(currentTime);
+ if(tick <= 0)
+ {
+ tick = _ticker.tick(currentTime);
+ }
+
+ LOGGER.debug("Tick " + tick);
+
+ int numberReady = _selector.select(tick <= 0 ? 1 : tick);
+ Set<SelectionKey> selectionKeys = _selector.selectedKeys();
+ selectionKeys.clear();
+
+ LOGGER.debug("Number Ready " + numberReady);
+
+ doWrite();
+ doRead();
+ boolean fullyWritten = doWrite();
+
+ _socketChannel.register(_selector, fullyWritten ? SelectionKey.OP_READ : (SelectionKey.OP_WRITE | SelectionKey.OP_READ));
+ }
+ catch (IOException e)
+ {
+ LOGGER.info("Exception performing I/O for thread '" + _remoteSocketAddress + "': " + e);
+ close();
+ }
+ }
+
+ try
+ {
+ while(!doWrite())
+ {
+ }
+
+ try
+ {
+ _receiver.closed();
+ }
+ finally
+ {
+ _socketChannel.close();
+ }
+ }
+ catch (IOException e)
+ {
+ LOGGER.info("Exception performing final output for thread '" + _remoteSocketAddress + "': " + e);
+ }
+ }
+
+
+
+ @Override
+ public void flush()
+ {
+ // maybe just wakeup?
+
+ }
+
+ @Override
+ public void close()
+ {
+ LOGGER.debug("Closing " + _remoteSocketAddress);
+
+ _closed.set(true);
+ _selector.wakeup();
+
+ }
+
+ private boolean doWrite() throws IOException
+ {
+ int byteBuffersWritten = 0;
+
+ ByteBuffer[] bufArray = new ByteBuffer[_buffers.size()];
+ Iterator<ByteBuffer> bufferIterator = _buffers.iterator();
+ for (int i = 0; i < bufArray.length; i++)
+ {
+ bufArray[i] = bufferIterator.next();
+ }
+
+ _socketChannel.write(bufArray);
+
+ for (ByteBuffer buf : bufArray)
+ {
+ if (buf.remaining() == 0)
+ {
+ byteBuffersWritten++;
+ _buffers.poll();
+ }
+ }
+
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("Written " + byteBuffersWritten + " byte buffer(s) completely");
+ }
+
+ return bufArray.length == byteBuffersWritten;
+ }
+
+ private void doRead() throws IOException
+ {
+
+ ByteBuffer buffer;
+ int remaining;
+ do
+ {
+ buffer = ByteBuffer.allocate(_receiveBufSize);
+ _socketChannel.read(buffer);
+ remaining = buffer.remaining();
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("Read " + buffer.position() + " byte(s)");
+ }
+ buffer.flip();
+ _receiver.received(buffer);
+ }
+ while (remaining == 0);
+
+ }
+}