summaryrefslogtreecommitdiff
path: root/java/common/src
diff options
context:
space:
mode:
authorRobert Gemmell <robbie@apache.org>2011-07-07 15:09:14 +0000
committerRobert Gemmell <robbie@apache.org>2011-07-07 15:09:14 +0000
commitb4f9004439f56f492931f4b35f7fa0ae58f3ff85 (patch)
tree773e2eb0f54ca1c1f66c34ecd759e58b03a74c88 /java/common/src
parentd9f3516ede5a60f446d9189b8935097479812da0 (diff)
downloadqpid-python-b4f9004439f56f492931f4b35f7fa0ae58f3ff85.tar.gz
QPID-3342: rationalise the existing 0-10 transport code and introduce new NetworkTransport + NetworkConnection abstraction. Decouple IoSender and IoReceiver, initiate their threads after the constructor completes.
Applied patch by Keith Wall and myself git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1143866 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/common/src')
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/Connection.java23
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/TransportBuilder.java78
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/network/NetworkConnection.java (renamed from java/common/src/main/java/org/apache/qpid/transport/network/io/IoContext.java)30
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/network/NetworkTransport.java16
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/network/OutgoingNetworkTransport.java32
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/network/Transport.java56
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java82
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java87
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java30
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java51
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java85
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLSender.java6
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/QpidClientX509KeyManager.java12
-rw-r--r--java/common/src/test/java/org/apache/qpid/transport/network/io/IoAcceptor.java (renamed from java/common/src/main/java/org/apache/qpid/transport/network/io/IoAcceptor.java)0
14 files changed, 315 insertions, 273 deletions
diff --git a/java/common/src/main/java/org/apache/qpid/transport/Connection.java b/java/common/src/main/java/org/apache/qpid/transport/Connection.java
index 19f00378b1..609611e3fb 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/Connection.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/Connection.java
@@ -27,6 +27,7 @@ import static org.apache.qpid.transport.Connection.State.OPEN;
import static org.apache.qpid.transport.Connection.State.OPENING;
import static org.apache.qpid.transport.Connection.State.RESUMING;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -40,6 +41,12 @@ import java.util.concurrent.atomic.AtomicLong;
import javax.security.sasl.SaslClient;
import javax.security.sasl.SaslServer;
+import org.apache.qpid.transport.network.Assembler;
+import org.apache.qpid.transport.network.Disassembler;
+import org.apache.qpid.transport.network.InputHandler;
+import org.apache.qpid.transport.network.NetworkConnection;
+import org.apache.qpid.transport.network.OutgoingNetworkTransport;
+import org.apache.qpid.transport.network.io.IoNetworkTransport;
import org.apache.qpid.transport.network.security.SecurityLayer;
import org.apache.qpid.transport.util.Logger;
import org.apache.qpid.transport.util.Waiter;
@@ -235,13 +242,15 @@ public class Connection extends ConnectionInvoker
state = OPENING;
userID = settings.getUsername();
delegate = new ClientDelegate(settings);
-
- TransportBuilder transport = new TransportBuilder();
- transport.init(this);
- this.sender = transport.buildSenderPipe();
- transport.buildReceiverPipe(this);
- this.securityLayer = transport.getSecurityLayer();
-
+
+ securityLayer = new SecurityLayer();
+ securityLayer.init(this);
+
+ OutgoingNetworkTransport transport = new IoNetworkTransport();
+ Receiver<ByteBuffer> receiver = securityLayer.receiver(new InputHandler(new Assembler(this)));
+ NetworkConnection network = transport.connect(settings, receiver, null);
+ sender = new Disassembler(securityLayer.sender(network.getSender()), settings.getMaxFrameSize());
+
send(new ProtocolHeader(1, 0, 10));
Waiter w = new Waiter(lock, timeout);
diff --git a/java/common/src/main/java/org/apache/qpid/transport/TransportBuilder.java b/java/common/src/main/java/org/apache/qpid/transport/TransportBuilder.java
deleted file mode 100644
index c08909c6e4..0000000000
--- a/java/common/src/main/java/org/apache/qpid/transport/TransportBuilder.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.transport;
-
-import java.nio.ByteBuffer;
-
-import org.apache.qpid.transport.network.Assembler;
-import org.apache.qpid.transport.network.Disassembler;
-import org.apache.qpid.transport.network.InputHandler;
-import org.apache.qpid.transport.network.NetworkTransport;
-import org.apache.qpid.transport.network.Transport;
-import org.apache.qpid.transport.network.security.SecurityLayer;
-
-public class TransportBuilder
-{
- private Connection con;
- private ConnectionSettings settings;
- private NetworkTransport transport;
- private SecurityLayer securityLayer = new SecurityLayer();
-
- public void init(Connection con) throws TransportException
- {
- this.con = con;
- this.settings = con.getConnectionSettings();
- transport = Transport.getTransport();
- transport.init(settings);
- securityLayer.init(con);
- }
-
- public Sender<ProtocolEvent> buildSenderPipe()
- {
- ConnectionSettings settings = con.getConnectionSettings();
-
- // Io layer
- Sender<ByteBuffer> sender = transport.sender();
-
- // Security layer
- sender = securityLayer.sender(sender);
-
- Disassembler dis = new Disassembler(sender, settings.getMaxFrameSize());
- return dis;
- }
-
- public void buildReceiverPipe(Receiver<ProtocolEvent> delegate)
- {
- Receiver<ByteBuffer> receiver = new InputHandler(new Assembler(delegate));
-
- // Security layer
- receiver = securityLayer.receiver(receiver);
-
- //Io layer
- transport.receiver(receiver);
- }
-
- public SecurityLayer getSecurityLayer()
- {
- return securityLayer;
- }
-
-} \ No newline at end of file
diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoContext.java b/java/common/src/main/java/org/apache/qpid/transport/network/NetworkConnection.java
index 69b3a0ce45..80b32ea909 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoContext.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/network/NetworkConnection.java
@@ -1,5 +1,5 @@
/*
- *
+ *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -7,29 +7,37 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
- *
+ *
*/
-package org.apache.qpid.transport.network.io;
+package org.apache.qpid.transport.network;
-import java.net.Socket;
+import java.net.SocketAddress;
import java.nio.ByteBuffer;
import org.apache.qpid.transport.Sender;
-public interface IoContext
+public interface NetworkConnection
{
Sender<ByteBuffer> getSender();
-
- IoReceiver getReceiver();
- Socket getSocket();
-}
+ void close();
+
+ /**
+ * Returns the remote address of the underlying socket.
+ */
+ SocketAddress getRemoteAddress();
+
+ /**
+ * Returns the local address of the underlying socket.
+ */
+ SocketAddress getLocalAddress();
+} \ No newline at end of file
diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/NetworkTransport.java b/java/common/src/main/java/org/apache/qpid/transport/network/NetworkTransport.java
index 5e12d7e7c6..9371835e89 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/network/NetworkTransport.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/network/NetworkTransport.java
@@ -20,19 +20,9 @@
*/
package org.apache.qpid.transport.network;
-import java.nio.ByteBuffer;
-
-import org.apache.qpid.transport.Receiver;
-import org.apache.qpid.transport.Sender;
-import org.apache.qpid.transport.ConnectionSettings;
-
public interface NetworkTransport
{
- public void init(ConnectionSettings settings);
-
- public Sender<ByteBuffer> sender();
-
- public void receiver(Receiver<ByteBuffer> delegate);
-
public void close();
-} \ No newline at end of file
+
+ public NetworkConnection getConnection();
+}
diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/OutgoingNetworkTransport.java b/java/common/src/main/java/org/apache/qpid/transport/network/OutgoingNetworkTransport.java
new file mode 100644
index 0000000000..ff86ba481f
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpid/transport/network/OutgoingNetworkTransport.java
@@ -0,0 +1,32 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.transport.network;
+
+import java.nio.ByteBuffer;
+
+import org.apache.qpid.ssl.SSLContextFactory;
+import org.apache.qpid.transport.ConnectionSettings;
+import org.apache.qpid.transport.Receiver;
+
+public interface OutgoingNetworkTransport extends NetworkTransport
+{
+ public NetworkConnection connect(ConnectionSettings settings, Receiver<ByteBuffer> delegate, SSLContextFactory sslFactory);
+} \ No newline at end of file
diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/Transport.java b/java/common/src/main/java/org/apache/qpid/transport/network/Transport.java
deleted file mode 100644
index f0bf04d04f..0000000000
--- a/java/common/src/main/java/org/apache/qpid/transport/network/Transport.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.qpid.transport.network;
-
-import org.apache.qpid.transport.TransportException;
-
-public class Transport
-{
- private final static Class<?> transportClass;
-
- static
- {
- try
- {
- transportClass =
- Class.forName(System.getProperty("qpid.transport",
- "org.apache.qpid.transport.network.io.IoNetworkTransport"));
-
- }
- catch(Exception e)
- {
- throw new Error("Error occured while loading Qpid Transport",e);
- }
- }
-
- public static NetworkTransport getTransport() throws TransportException
- {
- try
- {
- return (NetworkTransport)transportClass.newInstance();
- }
- catch (Exception e)
- {
- throw new TransportException("Error while creating a new transport instance",e);
- }
- }
-} \ No newline at end of file
diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java
new file mode 100644
index 0000000000..3252544fee
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java
@@ -0,0 +1,82 @@
+/*
+*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.transport.network.io;
+
+import java.net.Socket;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+
+import org.apache.qpid.transport.Receiver;
+import org.apache.qpid.transport.Sender;
+import org.apache.qpid.transport.network.NetworkConnection;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class IoNetworkConnection implements NetworkConnection
+{
+ private static final Logger LOGGER = LoggerFactory.getLogger(IoNetworkConnection.class);
+ private final Socket _socket;
+ private final long _timeout;
+ private final IoSender _ioSender;
+ private final IoReceiver _ioReceiver;
+
+ public IoNetworkConnection(Socket socket, Receiver<ByteBuffer> delegate,
+ int sendBufferSize, int receiveBufferSize, long timeout)
+ {
+ _socket = socket;
+ _timeout = timeout;
+
+ _ioReceiver = new IoReceiver(_socket, delegate, receiveBufferSize,_timeout);
+ _ioSender = new IoSender(_socket, 2 * sendBufferSize, _timeout);
+ _ioSender.registerCloseListener(_ioReceiver);
+
+ _ioReceiver.initiate();
+ _ioSender.initiate();
+ }
+
+ public Sender<ByteBuffer> getSender()
+ {
+ return _ioSender;
+ }
+
+ public void close()
+ {
+ try
+ {
+ _ioSender.close();
+ }
+ finally
+ {
+ _ioReceiver.close(false);
+ }
+ }
+
+ public SocketAddress getRemoteAddress()
+ {
+ return _socket.getRemoteSocketAddress();
+ }
+
+ public SocketAddress getLocalAddress()
+ {
+ return _socket.getLocalSocketAddress();
+ }
+
+}
diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java
index dd6a37eca2..d611ab1cf3 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java
@@ -27,14 +27,15 @@ import java.net.Socket;
import java.net.SocketException;
import java.nio.ByteBuffer;
+import org.apache.qpid.ssl.SSLContextFactory;
import org.apache.qpid.transport.ConnectionSettings;
import org.apache.qpid.transport.Receiver;
-import org.apache.qpid.transport.Sender;
import org.apache.qpid.transport.TransportException;
-import org.apache.qpid.transport.network.NetworkTransport;
+import org.apache.qpid.transport.network.NetworkConnection;
+import org.apache.qpid.transport.network.OutgoingNetworkTransport;
import org.apache.qpid.transport.util.Logger;
-public class IoNetworkTransport implements NetworkTransport, IoContext
+public class IoNetworkTransport implements OutgoingNetworkTransport
{
static
{
@@ -44,34 +45,31 @@ public class IoNetworkTransport implements NetworkTransport, IoContext
(Boolean.getBoolean("amqj.enableDirectBuffers"));
}
- private static final Logger log = Logger.get(IoNetworkTransport.class);
+ private static final Logger LOGGER = Logger.get(IoNetworkTransport.class);
- private Socket socket;
- private Sender<ByteBuffer> sender;
- private IoReceiver receiver;
- private long timeout = 60000;
- private ConnectionSettings settings;
+ private Socket _socket;
+ private IoNetworkConnection _connection;
+ private long _timeout = 60000;
- public void init(ConnectionSettings settings)
+ public NetworkConnection connect(ConnectionSettings settings, Receiver<ByteBuffer> delegate, SSLContextFactory sslFactory)
{
+ int sendBufferSize = settings.getWriteBufferSize();
+ int receiveBufferSize = settings.getReadBufferSize();
+
try
{
- this.settings = settings;
- InetAddress address = InetAddress.getByName(settings.getHost());
- socket = new Socket();
- socket.setReuseAddress(true);
- socket.setTcpNoDelay(settings.isTcpNodelay());
-
- log.debug("default-SO_RCVBUF : %s", socket.getReceiveBufferSize());
- log.debug("default-SO_SNDBUF : %s", socket.getSendBufferSize());
+ _socket = new Socket();
+ _socket.setReuseAddress(true);
+ _socket.setTcpNoDelay(settings.isTcpNodelay());
+ _socket.setSendBufferSize(sendBufferSize);
+ _socket.setReceiveBufferSize(receiveBufferSize);
- socket.setSendBufferSize(settings.getWriteBufferSize());
- socket.setReceiveBufferSize(settings.getReadBufferSize());
+ LOGGER.debug("SO_RCVBUF : %s", _socket.getReceiveBufferSize());
+ LOGGER.debug("SO_SNDBUF : %s", _socket.getSendBufferSize());
- log.debug("new-SO_RCVBUF : %s", socket.getReceiveBufferSize());
- log.debug("new-SO_SNDBUF : %s", socket.getSendBufferSize());
+ InetAddress address = InetAddress.getByName(settings.getHost());
- socket.connect(new InetSocketAddress(address, settings.getPort()));
+ _socket.connect(new InetSocketAddress(address, settings.getPort()));
}
catch (SocketException e)
{
@@ -81,36 +79,35 @@ public class IoNetworkTransport implements NetworkTransport, IoContext
{
throw new TransportException("Error connecting to broker", e);
}
- }
- public void receiver(Receiver<ByteBuffer> delegate)
- {
- receiver = new IoReceiver(this, delegate,
- 2*settings.getReadBufferSize() , timeout);
- }
-
- public Sender<ByteBuffer> sender()
- {
- return new IoSender(this, 2*settings.getWriteBufferSize(), timeout);
- }
+ try
+ {
+ _connection = new IoNetworkConnection(_socket, delegate, sendBufferSize, receiveBufferSize, _timeout);
+ }
+ catch(Exception e)
+ {
+ try
+ {
+ _socket.close();
+ }
+ catch(IOException ioe)
+ {
+ //ignored, throw based on original exception
+ }
- public void close()
- {
-
- }
+ throw new TransportException("Error creating network connection", e);
+ }
- public Sender<ByteBuffer> getSender()
- {
- return sender;
+ return _connection;
}
- public IoReceiver getReceiver()
+ public void close()
{
- return receiver;
+ _connection.close();
}
- public Socket getSocket()
+ public NetworkConnection getConnection()
{
- return socket;
+ return _connection;
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java
index 19a683d505..fea87fc350 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.transport.network.io;
+import org.apache.qpid.common.Closeable;
import org.apache.qpid.thread.Threading;
import org.apache.qpid.transport.Receiver;
import org.apache.qpid.transport.TransportException;
@@ -37,43 +38,54 @@ import java.util.concurrent.atomic.AtomicBoolean;
*
*/
-final class IoReceiver implements Runnable
+final class IoReceiver implements Runnable, Closeable
{
private static final Logger log = Logger.get(IoReceiver.class);
- private final IoContext ioCtx;
private final Receiver<ByteBuffer> receiver;
private final int bufferSize;
private final Socket socket;
private final long timeout;
private final AtomicBoolean closed = new AtomicBoolean(false);
private final Thread receiverThread;
- private final boolean shutdownBroken =
- ((String) System.getProperties().get("os.name")).matches("(?i).*windows.*");
+ private static final boolean shutdownBroken;
+ static
+ {
+ String osName = System.getProperty("os.name");
+ shutdownBroken = osName == null ? false : osName.matches("(?i).*windows.*");
+ }
- public IoReceiver(IoContext ioCtx, Receiver<ByteBuffer> receiver,
- int bufferSize, long timeout)
+ public IoReceiver(Socket socket, Receiver<ByteBuffer> receiver, int bufferSize, long timeout)
{
- this.ioCtx = ioCtx;
this.receiver = receiver;
this.bufferSize = bufferSize;
- this.socket = ioCtx.getSocket();
+ this.socket = socket;
this.timeout = timeout;
try
{
+ //Create but deliberately don't start the thread.
receiverThread = Threading.getThreadFactory().createThread(this);
}
catch(Exception e)
{
- throw new Error("Error creating IOReceiver thread",e);
+ throw new RuntimeException("Error creating IOReceiver thread",e);
}
receiverThread.setDaemon(true);
receiverThread.setName(String.format("IoReceiver - %s", socket.getRemoteSocketAddress()));
+ }
+
+ public void initiate()
+ {
receiverThread.start();
}
+ public void close()
+ {
+ close(false);
+ }
+
void close(boolean block)
{
if (!closed.getAndSet(true))
diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java
index 66b97e8225..1bb515624c 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java
@@ -24,8 +24,11 @@ import java.io.IOException;
import java.io.OutputStream;
import java.net.Socket;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.qpid.common.Closeable;
import org.apache.qpid.thread.Threading;
import org.apache.qpid.transport.Sender;
import org.apache.qpid.transport.SenderException;
@@ -43,7 +46,6 @@ public final class IoSender implements Runnable, Sender<ByteBuffer>
// we can test other cases as well
private final static int START = Integer.MAX_VALUE - 10;
- private final IoContext ioCtx;
private final long timeout;
private final Socket socket;
private final OutputStream out;
@@ -56,14 +58,13 @@ public final class IoSender implements Runnable, Sender<ByteBuffer>
private final Object notEmpty = new Object();
private final AtomicBoolean closed = new AtomicBoolean(false);
private final Thread senderThread;
+ private final List<Closeable> _listeners = new ArrayList<Closeable>();
private volatile Throwable exception = null;
-
- public IoSender(IoContext ioCtx, int bufferSize, long timeout)
+ public IoSender(Socket socket, int bufferSize, long timeout)
{
- this.ioCtx = ioCtx;
- this.socket = ioCtx.getSocket();
+ this.socket = socket;
this.buffer = new byte[pof2(bufferSize)]; // buffer size must be a power of 2
this.timeout = timeout;
@@ -78,6 +79,7 @@ public final class IoSender implements Runnable, Sender<ByteBuffer>
try
{
+ //Create but deliberately don't start the thread.
senderThread = Threading.getThreadFactory().createThread(this);
}
catch(Exception e)
@@ -87,6 +89,10 @@ public final class IoSender implements Runnable, Sender<ByteBuffer>
senderThread.setDaemon(true);
senderThread.setName(String.format("IoSender - %s", socket.getRemoteSocketAddress()));
+ }
+
+ public void initiate()
+ {
senderThread.start();
}
@@ -204,16 +210,20 @@ public final class IoSender implements Runnable, Sender<ByteBuffer>
senderThread.join(timeout);
if (senderThread.isAlive())
{
+ log.error("join timed out");
throw new SenderException("join timed out");
}
}
- ioCtx.getReceiver().close(false);
}
catch (InterruptedException e)
{
+ log.error("interrupted whilst waiting for sender thread to stop");
throw new SenderException(e);
}
-
+ finally
+ {
+ closeListeners();
+ }
if (reportException && exception != null)
{
throw new SenderException(exception);
@@ -221,6 +231,28 @@ public final class IoSender implements Runnable, Sender<ByteBuffer>
}
}
+ private void closeListeners()
+ {
+ Exception ex = null;
+ for(Closeable listener : _listeners)
+ {
+ try
+ {
+ listener.close();
+ }
+ catch(Exception e)
+ {
+ log.error("Exception closing listener: " + e.getMessage());
+ ex = e;
+ }
+ }
+
+ if (ex != null)
+ {
+ throw new SenderException(ex.getMessage(), ex);
+ }
+ }
+
public void run()
{
final int size = buffer.length;
@@ -304,4 +336,9 @@ public final class IoSender implements Runnable, Sender<ByteBuffer>
throw new SenderException(e);
}
}
+
+ public void registerCloseListener(Closeable listener)
+ {
+ _listeners.add(listener);
+ }
}
diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java
index f261111777..796a845593 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java
@@ -42,7 +42,7 @@ import org.apache.qpid.transport.util.Logger;
* SO_RCVBUF - amqj.receiveBufferSize
* SO_SNDBUF - amqj.sendBufferSize
*/
-public final class IoTransport<E> implements IoContext
+public final class IoTransport<E>
{
static
@@ -70,44 +70,63 @@ public final class IoTransport<E> implements IoContext
IoTransport(Socket socket, Binding<E,ByteBuffer> binding, boolean ssl)
{
this.socket = socket;
-
+
if (ssl)
{
- SSLEngine engine = null;
- SSLContext sslCtx;
- try
- {
- sslCtx = createSSLContext();
- }
- catch (Exception e)
- {
- throw new TransportException("Error creating SSL Context", e);
- }
-
- try
- {
- engine = sslCtx.createSSLEngine();
- engine.setUseClientMode(true);
- }
- catch(Exception e)
- {
- throw new TransportException("Error creating SSL Engine", e);
- }
-
- this.sender = new SSLSender(engine,new IoSender(this, 2*writeBufferSize, timeout));
- this.endpoint = binding.endpoint(sender);
- this.receiver = new IoReceiver(this, new SSLReceiver(engine,binding.receiver(endpoint),(SSLSender)sender),
- 2*readBufferSize, timeout);
-
- log.info("SSL Sender and Receiver initiated");
+ setupSSLTransport(socket, binding);
}
else
{
- this.sender = new IoSender(this, 2*writeBufferSize, timeout);
- this.endpoint = binding.endpoint(sender);
- this.receiver = new IoReceiver(this, binding.receiver(endpoint),
- 2*readBufferSize, timeout);
+ setupTransport(socket, binding);
+ }
+ }
+
+ private void setupTransport(Socket socket, Binding<E, ByteBuffer> binding)
+ {
+ IoSender ios = new IoSender(socket, 2*writeBufferSize, timeout);
+ ios.initiate();
+
+ this.sender = ios;
+ this.endpoint = binding.endpoint(sender);
+ this.receiver = new IoReceiver(socket, binding.receiver(endpoint),
+ 2*readBufferSize, timeout);
+ this.receiver.initiate();
+
+ ios.registerCloseListener(this.receiver);
+ }
+
+ private void setupSSLTransport(Socket socket, Binding<E, ByteBuffer> binding)
+ {
+ SSLEngine engine = null;
+ SSLContext sslCtx;
+ try
+ {
+ sslCtx = createSSLContext();
+ }
+ catch (Exception e)
+ {
+ throw new TransportException("Error creating SSL Context", e);
+ }
+
+ try
+ {
+ engine = sslCtx.createSSLEngine();
+ engine.setUseClientMode(true);
}
+ catch(Exception e)
+ {
+ throw new TransportException("Error creating SSL Engine", e);
+ }
+ IoSender ios = new IoSender(socket, 2*writeBufferSize, timeout);
+ ios.initiate();
+ this.sender = new SSLSender(engine,ios);
+ this.endpoint = binding.endpoint(sender);
+ this.receiver = new IoReceiver(socket, new SSLReceiver(engine,binding.receiver(endpoint),(SSLSender)sender),
+ 2*readBufferSize, timeout);
+ this.receiver.initiate();
+ ios.registerCloseListener(this.receiver);
+
+ log.info("SSL Sender and Receiver initiated");
}
public Sender<ByteBuffer> getSender()
diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLSender.java b/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLSender.java
index 27255f79f6..2d9e4e9a7e 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLSender.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLSender.java
@@ -43,8 +43,7 @@ public class SASLSender extends SASLEncryptor implements Sender<ByteBuffer> {
this.delegate = delegate;
log.debug("SASL Sender enabled");
}
-
- @Override
+
public void close()
{
@@ -65,13 +64,11 @@ public class SASLSender extends SASLEncryptor implements Sender<ByteBuffer> {
}
}
- @Override
public void flush()
{
delegate.flush();
}
- @Override
public void send(ByteBuffer buf)
{
if (closed.get())
@@ -108,7 +105,6 @@ public class SASLSender extends SASLEncryptor implements Sender<ByteBuffer> {
}
}
- @Override
public void setIdleTimeout(int i)
{
delegate.setIdleTimeout(i);
diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/QpidClientX509KeyManager.java b/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/QpidClientX509KeyManager.java
index 14f28f8828..0dd86d4560 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/QpidClientX509KeyManager.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/QpidClientX509KeyManager.java
@@ -48,51 +48,45 @@ public class QpidClientX509KeyManager extends X509ExtendedKeyManager
kmf.init(ks, keyStorePassword.toCharArray());
this.delegate = (X509ExtendedKeyManager)kmf.getKeyManagers()[0];
}
-
- @Override
+
public String chooseClientAlias(String[] keyType, Principal[] issuers, Socket socket)
{
log.debug("chooseClientAlias:Returning alias " + alias);
return alias;
}
- @Override
public String chooseServerAlias(String keyType, Principal[] issuers, Socket socket)
{
return delegate.chooseServerAlias(keyType, issuers, socket);
}
- @Override
public X509Certificate[] getCertificateChain(String alias)
{
return delegate.getCertificateChain(alias);
}
- @Override
public String[] getClientAliases(String keyType, Principal[] issuers)
{
log.debug("getClientAliases:Returning alias " + alias);
return new String[]{alias};
}
- @Override
public PrivateKey getPrivateKey(String alias)
{
return delegate.getPrivateKey(alias);
}
- @Override
public String[] getServerAliases(String keyType, Principal[] issuers)
{
return delegate.getServerAliases(keyType, issuers);
}
-
+
public String chooseEngineClientAlias(String[] keyType, Principal[] issuers, SSLEngine engine)
{
log.debug("chooseEngineClientAlias:Returning alias " + alias);
return alias;
}
-
+
public String chooseEngineServerAlias(String keyType, Principal[] issuers, SSLEngine engine)
{
return delegate.chooseEngineServerAlias(keyType, issuers, engine);
diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoAcceptor.java b/java/common/src/test/java/org/apache/qpid/transport/network/io/IoAcceptor.java
index 8530240dcc..8530240dcc 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoAcceptor.java
+++ b/java/common/src/test/java/org/apache/qpid/transport/network/io/IoAcceptor.java