From b4f9004439f56f492931f4b35f7fa0ae58f3ff85 Mon Sep 17 00:00:00 2001 From: Robert Gemmell Date: Thu, 7 Jul 2011 15:09:14 +0000 Subject: QPID-3342: rationalise the existing 0-10 transport code and introduce new NetworkTransport + NetworkConnection abstraction. Decouple IoSender and IoReceiver, initiate their threads after the constructor completes. Applied patch by Keith Wall and myself git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1143866 13f79535-47bb-0310-9956-ffa450edef68 --- .../java/org/apache/qpid/transport/Connection.java | 23 ++++-- .../apache/qpid/transport/TransportBuilder.java | 78 ------------------ .../qpid/transport/network/NetworkConnection.java | 43 ++++++++++ .../qpid/transport/network/NetworkTransport.java | 16 +--- .../network/OutgoingNetworkTransport.java | 32 ++++++++ .../apache/qpid/transport/network/Transport.java | 56 ------------- .../qpid/transport/network/io/IoAcceptor.java | 92 ---------------------- .../qpid/transport/network/io/IoContext.java | 35 -------- .../transport/network/io/IoNetworkConnection.java | 82 +++++++++++++++++++ .../transport/network/io/IoNetworkTransport.java | 87 ++++++++++---------- .../qpid/transport/network/io/IoReceiver.java | 30 ++++--- .../apache/qpid/transport/network/io/IoSender.java | 51 ++++++++++-- .../qpid/transport/network/io/IoTransport.java | 85 ++++++++++++-------- .../network/security/sasl/SASLSender.java | 6 +- .../security/ssl/QpidClientX509KeyManager.java | 12 +-- .../qpid/transport/network/io/IoAcceptor.java | 92 ++++++++++++++++++++++ 16 files changed, 431 insertions(+), 389 deletions(-) delete mode 100644 java/common/src/main/java/org/apache/qpid/transport/TransportBuilder.java create mode 100644 java/common/src/main/java/org/apache/qpid/transport/network/NetworkConnection.java create mode 100644 java/common/src/main/java/org/apache/qpid/transport/network/OutgoingNetworkTransport.java delete mode 100644 java/common/src/main/java/org/apache/qpid/transport/network/Transport.java delete mode 100644 java/common/src/main/java/org/apache/qpid/transport/network/io/IoAcceptor.java delete mode 100644 java/common/src/main/java/org/apache/qpid/transport/network/io/IoContext.java create mode 100644 java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java create mode 100644 java/common/src/test/java/org/apache/qpid/transport/network/io/IoAcceptor.java (limited to 'java/common/src') diff --git a/java/common/src/main/java/org/apache/qpid/transport/Connection.java b/java/common/src/main/java/org/apache/qpid/transport/Connection.java index 19f00378b1..609611e3fb 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/Connection.java +++ b/java/common/src/main/java/org/apache/qpid/transport/Connection.java @@ -27,6 +27,7 @@ import static org.apache.qpid.transport.Connection.State.OPEN; import static org.apache.qpid.transport.Connection.State.OPENING; import static org.apache.qpid.transport.Connection.State.RESUMING; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -40,6 +41,12 @@ import java.util.concurrent.atomic.AtomicLong; import javax.security.sasl.SaslClient; import javax.security.sasl.SaslServer; +import org.apache.qpid.transport.network.Assembler; +import org.apache.qpid.transport.network.Disassembler; +import org.apache.qpid.transport.network.InputHandler; +import org.apache.qpid.transport.network.NetworkConnection; +import org.apache.qpid.transport.network.OutgoingNetworkTransport; +import org.apache.qpid.transport.network.io.IoNetworkTransport; import org.apache.qpid.transport.network.security.SecurityLayer; import org.apache.qpid.transport.util.Logger; import org.apache.qpid.transport.util.Waiter; @@ -235,13 +242,15 @@ public class Connection extends ConnectionInvoker state = OPENING; userID = settings.getUsername(); delegate = new ClientDelegate(settings); - - TransportBuilder transport = new TransportBuilder(); - transport.init(this); - this.sender = transport.buildSenderPipe(); - transport.buildReceiverPipe(this); - this.securityLayer = transport.getSecurityLayer(); - + + securityLayer = new SecurityLayer(); + securityLayer.init(this); + + OutgoingNetworkTransport transport = new IoNetworkTransport(); + Receiver receiver = securityLayer.receiver(new InputHandler(new Assembler(this))); + NetworkConnection network = transport.connect(settings, receiver, null); + sender = new Disassembler(securityLayer.sender(network.getSender()), settings.getMaxFrameSize()); + send(new ProtocolHeader(1, 0, 10)); Waiter w = new Waiter(lock, timeout); diff --git a/java/common/src/main/java/org/apache/qpid/transport/TransportBuilder.java b/java/common/src/main/java/org/apache/qpid/transport/TransportBuilder.java deleted file mode 100644 index c08909c6e4..0000000000 --- a/java/common/src/main/java/org/apache/qpid/transport/TransportBuilder.java +++ /dev/null @@ -1,78 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.transport; - -import java.nio.ByteBuffer; - -import org.apache.qpid.transport.network.Assembler; -import org.apache.qpid.transport.network.Disassembler; -import org.apache.qpid.transport.network.InputHandler; -import org.apache.qpid.transport.network.NetworkTransport; -import org.apache.qpid.transport.network.Transport; -import org.apache.qpid.transport.network.security.SecurityLayer; - -public class TransportBuilder -{ - private Connection con; - private ConnectionSettings settings; - private NetworkTransport transport; - private SecurityLayer securityLayer = new SecurityLayer(); - - public void init(Connection con) throws TransportException - { - this.con = con; - this.settings = con.getConnectionSettings(); - transport = Transport.getTransport(); - transport.init(settings); - securityLayer.init(con); - } - - public Sender buildSenderPipe() - { - ConnectionSettings settings = con.getConnectionSettings(); - - // Io layer - Sender sender = transport.sender(); - - // Security layer - sender = securityLayer.sender(sender); - - Disassembler dis = new Disassembler(sender, settings.getMaxFrameSize()); - return dis; - } - - public void buildReceiverPipe(Receiver delegate) - { - Receiver receiver = new InputHandler(new Assembler(delegate)); - - // Security layer - receiver = securityLayer.receiver(receiver); - - //Io layer - transport.receiver(receiver); - } - - public SecurityLayer getSecurityLayer() - { - return securityLayer; - } - -} \ No newline at end of file diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/NetworkConnection.java b/java/common/src/main/java/org/apache/qpid/transport/network/NetworkConnection.java new file mode 100644 index 0000000000..80b32ea909 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/transport/network/NetworkConnection.java @@ -0,0 +1,43 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.transport.network; + +import java.net.SocketAddress; +import java.nio.ByteBuffer; + +import org.apache.qpid.transport.Sender; + +public interface NetworkConnection +{ + Sender getSender(); + + void close(); + + /** + * Returns the remote address of the underlying socket. + */ + SocketAddress getRemoteAddress(); + + /** + * Returns the local address of the underlying socket. + */ + SocketAddress getLocalAddress(); +} \ No newline at end of file diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/NetworkTransport.java b/java/common/src/main/java/org/apache/qpid/transport/network/NetworkTransport.java index 5e12d7e7c6..9371835e89 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/network/NetworkTransport.java +++ b/java/common/src/main/java/org/apache/qpid/transport/network/NetworkTransport.java @@ -20,19 +20,9 @@ */ package org.apache.qpid.transport.network; -import java.nio.ByteBuffer; - -import org.apache.qpid.transport.Receiver; -import org.apache.qpid.transport.Sender; -import org.apache.qpid.transport.ConnectionSettings; - public interface NetworkTransport { - public void init(ConnectionSettings settings); - - public Sender sender(); - - public void receiver(Receiver delegate); - public void close(); -} \ No newline at end of file + + public NetworkConnection getConnection(); +} diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/OutgoingNetworkTransport.java b/java/common/src/main/java/org/apache/qpid/transport/network/OutgoingNetworkTransport.java new file mode 100644 index 0000000000..ff86ba481f --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/transport/network/OutgoingNetworkTransport.java @@ -0,0 +1,32 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.transport.network; + +import java.nio.ByteBuffer; + +import org.apache.qpid.ssl.SSLContextFactory; +import org.apache.qpid.transport.ConnectionSettings; +import org.apache.qpid.transport.Receiver; + +public interface OutgoingNetworkTransport extends NetworkTransport +{ + public NetworkConnection connect(ConnectionSettings settings, Receiver delegate, SSLContextFactory sslFactory); +} \ No newline at end of file diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/Transport.java b/java/common/src/main/java/org/apache/qpid/transport/network/Transport.java deleted file mode 100644 index f0bf04d04f..0000000000 --- a/java/common/src/main/java/org/apache/qpid/transport/network/Transport.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.qpid.transport.network; - -import org.apache.qpid.transport.TransportException; - -public class Transport -{ - private final static Class transportClass; - - static - { - try - { - transportClass = - Class.forName(System.getProperty("qpid.transport", - "org.apache.qpid.transport.network.io.IoNetworkTransport")); - - } - catch(Exception e) - { - throw new Error("Error occured while loading Qpid Transport",e); - } - } - - public static NetworkTransport getTransport() throws TransportException - { - try - { - return (NetworkTransport)transportClass.newInstance(); - } - catch (Exception e) - { - throw new TransportException("Error while creating a new transport instance",e); - } - } -} \ No newline at end of file diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoAcceptor.java b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoAcceptor.java deleted file mode 100644 index 8530240dcc..0000000000 --- a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoAcceptor.java +++ /dev/null @@ -1,92 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.transport.network.io; - -import org.apache.qpid.transport.Binding; -import org.apache.qpid.transport.TransportException; - -import java.io.IOException; - -import java.net.InetSocketAddress; -import java.net.ServerSocket; -import java.net.Socket; -import java.net.SocketAddress; - -import java.nio.ByteBuffer; - - -/** - * IoAcceptor - * - */ - -public class IoAcceptor extends Thread -{ - - - private ServerSocket socket; - private Binding binding; - - public IoAcceptor(SocketAddress address, Binding binding) - throws IOException - { - socket = new ServerSocket(); - socket.setReuseAddress(true); - socket.bind(address); - this.binding = binding; - - setName(String.format("IoAcceptor - %s", socket.getInetAddress())); - } - - /** - Close the underlying ServerSocket if it has not already been closed. - */ - public void close() throws IOException - { - if (!socket.isClosed()) - { - socket.close(); - } - } - - public IoAcceptor(String host, int port, Binding binding) - throws IOException - { - this(new InetSocketAddress(host, port), binding); - } - - public void run() - { - while (true) - { - try - { - Socket sock = socket.accept(); - IoTransport transport = new IoTransport(sock, binding,false); - } - catch (IOException e) - { - throw new TransportException(e); - } - } - } - -} diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoContext.java b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoContext.java deleted file mode 100644 index 69b3a0ce45..0000000000 --- a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoContext.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.transport.network.io; - -import java.net.Socket; -import java.nio.ByteBuffer; - -import org.apache.qpid.transport.Sender; - -public interface IoContext -{ - Sender getSender(); - - IoReceiver getReceiver(); - - Socket getSocket(); -} diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java new file mode 100644 index 0000000000..3252544fee --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java @@ -0,0 +1,82 @@ +/* +* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.transport.network.io; + +import java.net.Socket; +import java.net.SocketAddress; +import java.nio.ByteBuffer; + +import org.apache.qpid.transport.Receiver; +import org.apache.qpid.transport.Sender; +import org.apache.qpid.transport.network.NetworkConnection; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class IoNetworkConnection implements NetworkConnection +{ + private static final Logger LOGGER = LoggerFactory.getLogger(IoNetworkConnection.class); + private final Socket _socket; + private final long _timeout; + private final IoSender _ioSender; + private final IoReceiver _ioReceiver; + + public IoNetworkConnection(Socket socket, Receiver delegate, + int sendBufferSize, int receiveBufferSize, long timeout) + { + _socket = socket; + _timeout = timeout; + + _ioReceiver = new IoReceiver(_socket, delegate, receiveBufferSize,_timeout); + _ioSender = new IoSender(_socket, 2 * sendBufferSize, _timeout); + _ioSender.registerCloseListener(_ioReceiver); + + _ioReceiver.initiate(); + _ioSender.initiate(); + } + + public Sender getSender() + { + return _ioSender; + } + + public void close() + { + try + { + _ioSender.close(); + } + finally + { + _ioReceiver.close(false); + } + } + + public SocketAddress getRemoteAddress() + { + return _socket.getRemoteSocketAddress(); + } + + public SocketAddress getLocalAddress() + { + return _socket.getLocalSocketAddress(); + } + +} diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java index dd6a37eca2..d611ab1cf3 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java +++ b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java @@ -27,14 +27,15 @@ import java.net.Socket; import java.net.SocketException; import java.nio.ByteBuffer; +import org.apache.qpid.ssl.SSLContextFactory; import org.apache.qpid.transport.ConnectionSettings; import org.apache.qpid.transport.Receiver; -import org.apache.qpid.transport.Sender; import org.apache.qpid.transport.TransportException; -import org.apache.qpid.transport.network.NetworkTransport; +import org.apache.qpid.transport.network.NetworkConnection; +import org.apache.qpid.transport.network.OutgoingNetworkTransport; import org.apache.qpid.transport.util.Logger; -public class IoNetworkTransport implements NetworkTransport, IoContext +public class IoNetworkTransport implements OutgoingNetworkTransport { static { @@ -44,34 +45,31 @@ public class IoNetworkTransport implements NetworkTransport, IoContext (Boolean.getBoolean("amqj.enableDirectBuffers")); } - private static final Logger log = Logger.get(IoNetworkTransport.class); + private static final Logger LOGGER = Logger.get(IoNetworkTransport.class); - private Socket socket; - private Sender sender; - private IoReceiver receiver; - private long timeout = 60000; - private ConnectionSettings settings; + private Socket _socket; + private IoNetworkConnection _connection; + private long _timeout = 60000; - public void init(ConnectionSettings settings) + public NetworkConnection connect(ConnectionSettings settings, Receiver delegate, SSLContextFactory sslFactory) { + int sendBufferSize = settings.getWriteBufferSize(); + int receiveBufferSize = settings.getReadBufferSize(); + try { - this.settings = settings; - InetAddress address = InetAddress.getByName(settings.getHost()); - socket = new Socket(); - socket.setReuseAddress(true); - socket.setTcpNoDelay(settings.isTcpNodelay()); - - log.debug("default-SO_RCVBUF : %s", socket.getReceiveBufferSize()); - log.debug("default-SO_SNDBUF : %s", socket.getSendBufferSize()); + _socket = new Socket(); + _socket.setReuseAddress(true); + _socket.setTcpNoDelay(settings.isTcpNodelay()); + _socket.setSendBufferSize(sendBufferSize); + _socket.setReceiveBufferSize(receiveBufferSize); - socket.setSendBufferSize(settings.getWriteBufferSize()); - socket.setReceiveBufferSize(settings.getReadBufferSize()); + LOGGER.debug("SO_RCVBUF : %s", _socket.getReceiveBufferSize()); + LOGGER.debug("SO_SNDBUF : %s", _socket.getSendBufferSize()); - log.debug("new-SO_RCVBUF : %s", socket.getReceiveBufferSize()); - log.debug("new-SO_SNDBUF : %s", socket.getSendBufferSize()); + InetAddress address = InetAddress.getByName(settings.getHost()); - socket.connect(new InetSocketAddress(address, settings.getPort())); + _socket.connect(new InetSocketAddress(address, settings.getPort())); } catch (SocketException e) { @@ -81,36 +79,35 @@ public class IoNetworkTransport implements NetworkTransport, IoContext { throw new TransportException("Error connecting to broker", e); } - } - public void receiver(Receiver delegate) - { - receiver = new IoReceiver(this, delegate, - 2*settings.getReadBufferSize() , timeout); - } - - public Sender sender() - { - return new IoSender(this, 2*settings.getWriteBufferSize(), timeout); - } + try + { + _connection = new IoNetworkConnection(_socket, delegate, sendBufferSize, receiveBufferSize, _timeout); + } + catch(Exception e) + { + try + { + _socket.close(); + } + catch(IOException ioe) + { + //ignored, throw based on original exception + } - public void close() - { - - } + throw new TransportException("Error creating network connection", e); + } - public Sender getSender() - { - return sender; + return _connection; } - public IoReceiver getReceiver() + public void close() { - return receiver; + _connection.close(); } - public Socket getSocket() + public NetworkConnection getConnection() { - return socket; + return _connection; } } diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java index 19a683d505..fea87fc350 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java +++ b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java @@ -20,6 +20,7 @@ */ package org.apache.qpid.transport.network.io; +import org.apache.qpid.common.Closeable; import org.apache.qpid.thread.Threading; import org.apache.qpid.transport.Receiver; import org.apache.qpid.transport.TransportException; @@ -37,43 +38,54 @@ import java.util.concurrent.atomic.AtomicBoolean; * */ -final class IoReceiver implements Runnable +final class IoReceiver implements Runnable, Closeable { private static final Logger log = Logger.get(IoReceiver.class); - private final IoContext ioCtx; private final Receiver receiver; private final int bufferSize; private final Socket socket; private final long timeout; private final AtomicBoolean closed = new AtomicBoolean(false); private final Thread receiverThread; - private final boolean shutdownBroken = - ((String) System.getProperties().get("os.name")).matches("(?i).*windows.*"); + private static final boolean shutdownBroken; + static + { + String osName = System.getProperty("os.name"); + shutdownBroken = osName == null ? false : osName.matches("(?i).*windows.*"); + } - public IoReceiver(IoContext ioCtx, Receiver receiver, - int bufferSize, long timeout) + public IoReceiver(Socket socket, Receiver receiver, int bufferSize, long timeout) { - this.ioCtx = ioCtx; this.receiver = receiver; this.bufferSize = bufferSize; - this.socket = ioCtx.getSocket(); + this.socket = socket; this.timeout = timeout; try { + //Create but deliberately don't start the thread. receiverThread = Threading.getThreadFactory().createThread(this); } catch(Exception e) { - throw new Error("Error creating IOReceiver thread",e); + throw new RuntimeException("Error creating IOReceiver thread",e); } receiverThread.setDaemon(true); receiverThread.setName(String.format("IoReceiver - %s", socket.getRemoteSocketAddress())); + } + + public void initiate() + { receiverThread.start(); } + public void close() + { + close(false); + } + void close(boolean block) { if (!closed.getAndSet(true)) diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java index 66b97e8225..1bb515624c 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java +++ b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java @@ -24,8 +24,11 @@ import java.io.IOException; import java.io.OutputStream; import java.net.Socket; import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.qpid.common.Closeable; import org.apache.qpid.thread.Threading; import org.apache.qpid.transport.Sender; import org.apache.qpid.transport.SenderException; @@ -43,7 +46,6 @@ public final class IoSender implements Runnable, Sender // we can test other cases as well private final static int START = Integer.MAX_VALUE - 10; - private final IoContext ioCtx; private final long timeout; private final Socket socket; private final OutputStream out; @@ -56,14 +58,13 @@ public final class IoSender implements Runnable, Sender private final Object notEmpty = new Object(); private final AtomicBoolean closed = new AtomicBoolean(false); private final Thread senderThread; + private final List _listeners = new ArrayList(); private volatile Throwable exception = null; - - public IoSender(IoContext ioCtx, int bufferSize, long timeout) + public IoSender(Socket socket, int bufferSize, long timeout) { - this.ioCtx = ioCtx; - this.socket = ioCtx.getSocket(); + this.socket = socket; this.buffer = new byte[pof2(bufferSize)]; // buffer size must be a power of 2 this.timeout = timeout; @@ -78,6 +79,7 @@ public final class IoSender implements Runnable, Sender try { + //Create but deliberately don't start the thread. senderThread = Threading.getThreadFactory().createThread(this); } catch(Exception e) @@ -87,6 +89,10 @@ public final class IoSender implements Runnable, Sender senderThread.setDaemon(true); senderThread.setName(String.format("IoSender - %s", socket.getRemoteSocketAddress())); + } + + public void initiate() + { senderThread.start(); } @@ -204,16 +210,20 @@ public final class IoSender implements Runnable, Sender senderThread.join(timeout); if (senderThread.isAlive()) { + log.error("join timed out"); throw new SenderException("join timed out"); } } - ioCtx.getReceiver().close(false); } catch (InterruptedException e) { + log.error("interrupted whilst waiting for sender thread to stop"); throw new SenderException(e); } - + finally + { + closeListeners(); + } if (reportException && exception != null) { throw new SenderException(exception); @@ -221,6 +231,28 @@ public final class IoSender implements Runnable, Sender } } + private void closeListeners() + { + Exception ex = null; + for(Closeable listener : _listeners) + { + try + { + listener.close(); + } + catch(Exception e) + { + log.error("Exception closing listener: " + e.getMessage()); + ex = e; + } + } + + if (ex != null) + { + throw new SenderException(ex.getMessage(), ex); + } + } + public void run() { final int size = buffer.length; @@ -304,4 +336,9 @@ public final class IoSender implements Runnable, Sender throw new SenderException(e); } } + + public void registerCloseListener(Closeable listener) + { + _listeners.add(listener); + } } diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java index f261111777..796a845593 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java +++ b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java @@ -42,7 +42,7 @@ import org.apache.qpid.transport.util.Logger; * SO_RCVBUF - amqj.receiveBufferSize * SO_SNDBUF - amqj.sendBufferSize */ -public final class IoTransport implements IoContext +public final class IoTransport { static @@ -70,44 +70,63 @@ public final class IoTransport implements IoContext IoTransport(Socket socket, Binding binding, boolean ssl) { this.socket = socket; - + if (ssl) { - SSLEngine engine = null; - SSLContext sslCtx; - try - { - sslCtx = createSSLContext(); - } - catch (Exception e) - { - throw new TransportException("Error creating SSL Context", e); - } - - try - { - engine = sslCtx.createSSLEngine(); - engine.setUseClientMode(true); - } - catch(Exception e) - { - throw new TransportException("Error creating SSL Engine", e); - } - - this.sender = new SSLSender(engine,new IoSender(this, 2*writeBufferSize, timeout)); - this.endpoint = binding.endpoint(sender); - this.receiver = new IoReceiver(this, new SSLReceiver(engine,binding.receiver(endpoint),(SSLSender)sender), - 2*readBufferSize, timeout); - - log.info("SSL Sender and Receiver initiated"); + setupSSLTransport(socket, binding); } else { - this.sender = new IoSender(this, 2*writeBufferSize, timeout); - this.endpoint = binding.endpoint(sender); - this.receiver = new IoReceiver(this, binding.receiver(endpoint), - 2*readBufferSize, timeout); + setupTransport(socket, binding); + } + } + + private void setupTransport(Socket socket, Binding binding) + { + IoSender ios = new IoSender(socket, 2*writeBufferSize, timeout); + ios.initiate(); + + this.sender = ios; + this.endpoint = binding.endpoint(sender); + this.receiver = new IoReceiver(socket, binding.receiver(endpoint), + 2*readBufferSize, timeout); + this.receiver.initiate(); + + ios.registerCloseListener(this.receiver); + } + + private void setupSSLTransport(Socket socket, Binding binding) + { + SSLEngine engine = null; + SSLContext sslCtx; + try + { + sslCtx = createSSLContext(); + } + catch (Exception e) + { + throw new TransportException("Error creating SSL Context", e); + } + + try + { + engine = sslCtx.createSSLEngine(); + engine.setUseClientMode(true); } + catch(Exception e) + { + throw new TransportException("Error creating SSL Engine", e); + } + IoSender ios = new IoSender(socket, 2*writeBufferSize, timeout); + ios.initiate(); + this.sender = new SSLSender(engine,ios); + this.endpoint = binding.endpoint(sender); + this.receiver = new IoReceiver(socket, new SSLReceiver(engine,binding.receiver(endpoint),(SSLSender)sender), + 2*readBufferSize, timeout); + this.receiver.initiate(); + ios.registerCloseListener(this.receiver); + + log.info("SSL Sender and Receiver initiated"); } public Sender getSender() diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLSender.java b/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLSender.java index 27255f79f6..2d9e4e9a7e 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLSender.java +++ b/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLSender.java @@ -43,8 +43,7 @@ public class SASLSender extends SASLEncryptor implements Sender { this.delegate = delegate; log.debug("SASL Sender enabled"); } - - @Override + public void close() { @@ -65,13 +64,11 @@ public class SASLSender extends SASLEncryptor implements Sender { } } - @Override public void flush() { delegate.flush(); } - @Override public void send(ByteBuffer buf) { if (closed.get()) @@ -108,7 +105,6 @@ public class SASLSender extends SASLEncryptor implements Sender { } } - @Override public void setIdleTimeout(int i) { delegate.setIdleTimeout(i); diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/QpidClientX509KeyManager.java b/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/QpidClientX509KeyManager.java index 14f28f8828..0dd86d4560 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/QpidClientX509KeyManager.java +++ b/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/QpidClientX509KeyManager.java @@ -48,51 +48,45 @@ public class QpidClientX509KeyManager extends X509ExtendedKeyManager kmf.init(ks, keyStorePassword.toCharArray()); this.delegate = (X509ExtendedKeyManager)kmf.getKeyManagers()[0]; } - - @Override + public String chooseClientAlias(String[] keyType, Principal[] issuers, Socket socket) { log.debug("chooseClientAlias:Returning alias " + alias); return alias; } - @Override public String chooseServerAlias(String keyType, Principal[] issuers, Socket socket) { return delegate.chooseServerAlias(keyType, issuers, socket); } - @Override public X509Certificate[] getCertificateChain(String alias) { return delegate.getCertificateChain(alias); } - @Override public String[] getClientAliases(String keyType, Principal[] issuers) { log.debug("getClientAliases:Returning alias " + alias); return new String[]{alias}; } - @Override public PrivateKey getPrivateKey(String alias) { return delegate.getPrivateKey(alias); } - @Override public String[] getServerAliases(String keyType, Principal[] issuers) { return delegate.getServerAliases(keyType, issuers); } - + public String chooseEngineClientAlias(String[] keyType, Principal[] issuers, SSLEngine engine) { log.debug("chooseEngineClientAlias:Returning alias " + alias); return alias; } - + public String chooseEngineServerAlias(String keyType, Principal[] issuers, SSLEngine engine) { return delegate.chooseEngineServerAlias(keyType, issuers, engine); diff --git a/java/common/src/test/java/org/apache/qpid/transport/network/io/IoAcceptor.java b/java/common/src/test/java/org/apache/qpid/transport/network/io/IoAcceptor.java new file mode 100644 index 0000000000..8530240dcc --- /dev/null +++ b/java/common/src/test/java/org/apache/qpid/transport/network/io/IoAcceptor.java @@ -0,0 +1,92 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.transport.network.io; + +import org.apache.qpid.transport.Binding; +import org.apache.qpid.transport.TransportException; + +import java.io.IOException; + +import java.net.InetSocketAddress; +import java.net.ServerSocket; +import java.net.Socket; +import java.net.SocketAddress; + +import java.nio.ByteBuffer; + + +/** + * IoAcceptor + * + */ + +public class IoAcceptor extends Thread +{ + + + private ServerSocket socket; + private Binding binding; + + public IoAcceptor(SocketAddress address, Binding binding) + throws IOException + { + socket = new ServerSocket(); + socket.setReuseAddress(true); + socket.bind(address); + this.binding = binding; + + setName(String.format("IoAcceptor - %s", socket.getInetAddress())); + } + + /** + Close the underlying ServerSocket if it has not already been closed. + */ + public void close() throws IOException + { + if (!socket.isClosed()) + { + socket.close(); + } + } + + public IoAcceptor(String host, int port, Binding binding) + throws IOException + { + this(new InetSocketAddress(host, port), binding); + } + + public void run() + { + while (true) + { + try + { + Socket sock = socket.accept(); + IoTransport transport = new IoTransport(sock, binding,false); + } + catch (IOException e) + { + throw new TransportException(e); + } + } + } + +} -- cgit v1.2.1