diff options
| author | Robert Gemmell <robbie@apache.org> | 2011-07-07 15:09:14 +0000 |
|---|---|---|
| committer | Robert Gemmell <robbie@apache.org> | 2011-07-07 15:09:14 +0000 |
| commit | b4f9004439f56f492931f4b35f7fa0ae58f3ff85 (patch) | |
| tree | 773e2eb0f54ca1c1f66c34ecd759e58b03a74c88 /java/common/src | |
| parent | d9f3516ede5a60f446d9189b8935097479812da0 (diff) | |
| download | qpid-python-b4f9004439f56f492931f4b35f7fa0ae58f3ff85.tar.gz | |
QPID-3342: rationalise the existing 0-10 transport code and introduce new NetworkTransport + NetworkConnection abstraction. Decouple IoSender and IoReceiver, initiate their threads after the constructor completes.
Applied patch by Keith Wall and myself
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1143866 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/common/src')
14 files changed, 315 insertions, 273 deletions
diff --git a/java/common/src/main/java/org/apache/qpid/transport/Connection.java b/java/common/src/main/java/org/apache/qpid/transport/Connection.java index 19f00378b1..609611e3fb 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/Connection.java +++ b/java/common/src/main/java/org/apache/qpid/transport/Connection.java @@ -27,6 +27,7 @@ import static org.apache.qpid.transport.Connection.State.OPEN; import static org.apache.qpid.transport.Connection.State.OPENING; import static org.apache.qpid.transport.Connection.State.RESUMING; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -40,6 +41,12 @@ import java.util.concurrent.atomic.AtomicLong; import javax.security.sasl.SaslClient; import javax.security.sasl.SaslServer; +import org.apache.qpid.transport.network.Assembler; +import org.apache.qpid.transport.network.Disassembler; +import org.apache.qpid.transport.network.InputHandler; +import org.apache.qpid.transport.network.NetworkConnection; +import org.apache.qpid.transport.network.OutgoingNetworkTransport; +import org.apache.qpid.transport.network.io.IoNetworkTransport; import org.apache.qpid.transport.network.security.SecurityLayer; import org.apache.qpid.transport.util.Logger; import org.apache.qpid.transport.util.Waiter; @@ -235,13 +242,15 @@ public class Connection extends ConnectionInvoker state = OPENING; userID = settings.getUsername(); delegate = new ClientDelegate(settings); - - TransportBuilder transport = new TransportBuilder(); - transport.init(this); - this.sender = transport.buildSenderPipe(); - transport.buildReceiverPipe(this); - this.securityLayer = transport.getSecurityLayer(); - + + securityLayer = new SecurityLayer(); + securityLayer.init(this); + + OutgoingNetworkTransport transport = new IoNetworkTransport(); + Receiver<ByteBuffer> receiver = securityLayer.receiver(new InputHandler(new Assembler(this))); + NetworkConnection network = transport.connect(settings, receiver, null); + sender = new Disassembler(securityLayer.sender(network.getSender()), settings.getMaxFrameSize()); + send(new ProtocolHeader(1, 0, 10)); Waiter w = new Waiter(lock, timeout); diff --git a/java/common/src/main/java/org/apache/qpid/transport/TransportBuilder.java b/java/common/src/main/java/org/apache/qpid/transport/TransportBuilder.java deleted file mode 100644 index c08909c6e4..0000000000 --- a/java/common/src/main/java/org/apache/qpid/transport/TransportBuilder.java +++ /dev/null @@ -1,78 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.transport; - -import java.nio.ByteBuffer; - -import org.apache.qpid.transport.network.Assembler; -import org.apache.qpid.transport.network.Disassembler; -import org.apache.qpid.transport.network.InputHandler; -import org.apache.qpid.transport.network.NetworkTransport; -import org.apache.qpid.transport.network.Transport; -import org.apache.qpid.transport.network.security.SecurityLayer; - -public class TransportBuilder -{ - private Connection con; - private ConnectionSettings settings; - private NetworkTransport transport; - private SecurityLayer securityLayer = new SecurityLayer(); - - public void init(Connection con) throws TransportException - { - this.con = con; - this.settings = con.getConnectionSettings(); - transport = Transport.getTransport(); - transport.init(settings); - securityLayer.init(con); - } - - public Sender<ProtocolEvent> buildSenderPipe() - { - ConnectionSettings settings = con.getConnectionSettings(); - - // Io layer - Sender<ByteBuffer> sender = transport.sender(); - - // Security layer - sender = securityLayer.sender(sender); - - Disassembler dis = new Disassembler(sender, settings.getMaxFrameSize()); - return dis; - } - - public void buildReceiverPipe(Receiver<ProtocolEvent> delegate) - { - Receiver<ByteBuffer> receiver = new InputHandler(new Assembler(delegate)); - - // Security layer - receiver = securityLayer.receiver(receiver); - - //Io layer - transport.receiver(receiver); - } - - public SecurityLayer getSecurityLayer() - { - return securityLayer; - } - -}
\ No newline at end of file diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoContext.java b/java/common/src/main/java/org/apache/qpid/transport/network/NetworkConnection.java index 69b3a0ce45..80b32ea909 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoContext.java +++ b/java/common/src/main/java/org/apache/qpid/transport/network/NetworkConnection.java @@ -1,5 +1,5 @@ /* - * + * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -7,29 +7,37 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. - * + * */ -package org.apache.qpid.transport.network.io; +package org.apache.qpid.transport.network; -import java.net.Socket; +import java.net.SocketAddress; import java.nio.ByteBuffer; import org.apache.qpid.transport.Sender; -public interface IoContext +public interface NetworkConnection { Sender<ByteBuffer> getSender(); - - IoReceiver getReceiver(); - Socket getSocket(); -} + void close(); + + /** + * Returns the remote address of the underlying socket. + */ + SocketAddress getRemoteAddress(); + + /** + * Returns the local address of the underlying socket. + */ + SocketAddress getLocalAddress(); +}
\ No newline at end of file diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/NetworkTransport.java b/java/common/src/main/java/org/apache/qpid/transport/network/NetworkTransport.java index 5e12d7e7c6..9371835e89 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/network/NetworkTransport.java +++ b/java/common/src/main/java/org/apache/qpid/transport/network/NetworkTransport.java @@ -20,19 +20,9 @@ */ package org.apache.qpid.transport.network; -import java.nio.ByteBuffer; - -import org.apache.qpid.transport.Receiver; -import org.apache.qpid.transport.Sender; -import org.apache.qpid.transport.ConnectionSettings; - public interface NetworkTransport { - public void init(ConnectionSettings settings); - - public Sender<ByteBuffer> sender(); - - public void receiver(Receiver<ByteBuffer> delegate); - public void close(); -}
\ No newline at end of file + + public NetworkConnection getConnection(); +} diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/OutgoingNetworkTransport.java b/java/common/src/main/java/org/apache/qpid/transport/network/OutgoingNetworkTransport.java new file mode 100644 index 0000000000..ff86ba481f --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/transport/network/OutgoingNetworkTransport.java @@ -0,0 +1,32 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.transport.network; + +import java.nio.ByteBuffer; + +import org.apache.qpid.ssl.SSLContextFactory; +import org.apache.qpid.transport.ConnectionSettings; +import org.apache.qpid.transport.Receiver; + +public interface OutgoingNetworkTransport extends NetworkTransport +{ + public NetworkConnection connect(ConnectionSettings settings, Receiver<ByteBuffer> delegate, SSLContextFactory sslFactory); +}
\ No newline at end of file diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/Transport.java b/java/common/src/main/java/org/apache/qpid/transport/network/Transport.java deleted file mode 100644 index f0bf04d04f..0000000000 --- a/java/common/src/main/java/org/apache/qpid/transport/network/Transport.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.qpid.transport.network; - -import org.apache.qpid.transport.TransportException; - -public class Transport -{ - private final static Class<?> transportClass; - - static - { - try - { - transportClass = - Class.forName(System.getProperty("qpid.transport", - "org.apache.qpid.transport.network.io.IoNetworkTransport")); - - } - catch(Exception e) - { - throw new Error("Error occured while loading Qpid Transport",e); - } - } - - public static NetworkTransport getTransport() throws TransportException - { - try - { - return (NetworkTransport)transportClass.newInstance(); - } - catch (Exception e) - { - throw new TransportException("Error while creating a new transport instance",e); - } - } -}
\ No newline at end of file diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java new file mode 100644 index 0000000000..3252544fee --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java @@ -0,0 +1,82 @@ +/* +* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.transport.network.io; + +import java.net.Socket; +import java.net.SocketAddress; +import java.nio.ByteBuffer; + +import org.apache.qpid.transport.Receiver; +import org.apache.qpid.transport.Sender; +import org.apache.qpid.transport.network.NetworkConnection; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class IoNetworkConnection implements NetworkConnection +{ + private static final Logger LOGGER = LoggerFactory.getLogger(IoNetworkConnection.class); + private final Socket _socket; + private final long _timeout; + private final IoSender _ioSender; + private final IoReceiver _ioReceiver; + + public IoNetworkConnection(Socket socket, Receiver<ByteBuffer> delegate, + int sendBufferSize, int receiveBufferSize, long timeout) + { + _socket = socket; + _timeout = timeout; + + _ioReceiver = new IoReceiver(_socket, delegate, receiveBufferSize,_timeout); + _ioSender = new IoSender(_socket, 2 * sendBufferSize, _timeout); + _ioSender.registerCloseListener(_ioReceiver); + + _ioReceiver.initiate(); + _ioSender.initiate(); + } + + public Sender<ByteBuffer> getSender() + { + return _ioSender; + } + + public void close() + { + try + { + _ioSender.close(); + } + finally + { + _ioReceiver.close(false); + } + } + + public SocketAddress getRemoteAddress() + { + return _socket.getRemoteSocketAddress(); + } + + public SocketAddress getLocalAddress() + { + return _socket.getLocalSocketAddress(); + } + +} diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java index dd6a37eca2..d611ab1cf3 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java +++ b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java @@ -27,14 +27,15 @@ import java.net.Socket; import java.net.SocketException; import java.nio.ByteBuffer; +import org.apache.qpid.ssl.SSLContextFactory; import org.apache.qpid.transport.ConnectionSettings; import org.apache.qpid.transport.Receiver; -import org.apache.qpid.transport.Sender; import org.apache.qpid.transport.TransportException; -import org.apache.qpid.transport.network.NetworkTransport; +import org.apache.qpid.transport.network.NetworkConnection; +import org.apache.qpid.transport.network.OutgoingNetworkTransport; import org.apache.qpid.transport.util.Logger; -public class IoNetworkTransport implements NetworkTransport, IoContext +public class IoNetworkTransport implements OutgoingNetworkTransport { static { @@ -44,34 +45,31 @@ public class IoNetworkTransport implements NetworkTransport, IoContext (Boolean.getBoolean("amqj.enableDirectBuffers")); } - private static final Logger log = Logger.get(IoNetworkTransport.class); + private static final Logger LOGGER = Logger.get(IoNetworkTransport.class); - private Socket socket; - private Sender<ByteBuffer> sender; - private IoReceiver receiver; - private long timeout = 60000; - private ConnectionSettings settings; + private Socket _socket; + private IoNetworkConnection _connection; + private long _timeout = 60000; - public void init(ConnectionSettings settings) + public NetworkConnection connect(ConnectionSettings settings, Receiver<ByteBuffer> delegate, SSLContextFactory sslFactory) { + int sendBufferSize = settings.getWriteBufferSize(); + int receiveBufferSize = settings.getReadBufferSize(); + try { - this.settings = settings; - InetAddress address = InetAddress.getByName(settings.getHost()); - socket = new Socket(); - socket.setReuseAddress(true); - socket.setTcpNoDelay(settings.isTcpNodelay()); - - log.debug("default-SO_RCVBUF : %s", socket.getReceiveBufferSize()); - log.debug("default-SO_SNDBUF : %s", socket.getSendBufferSize()); + _socket = new Socket(); + _socket.setReuseAddress(true); + _socket.setTcpNoDelay(settings.isTcpNodelay()); + _socket.setSendBufferSize(sendBufferSize); + _socket.setReceiveBufferSize(receiveBufferSize); - socket.setSendBufferSize(settings.getWriteBufferSize()); - socket.setReceiveBufferSize(settings.getReadBufferSize()); + LOGGER.debug("SO_RCVBUF : %s", _socket.getReceiveBufferSize()); + LOGGER.debug("SO_SNDBUF : %s", _socket.getSendBufferSize()); - log.debug("new-SO_RCVBUF : %s", socket.getReceiveBufferSize()); - log.debug("new-SO_SNDBUF : %s", socket.getSendBufferSize()); + InetAddress address = InetAddress.getByName(settings.getHost()); - socket.connect(new InetSocketAddress(address, settings.getPort())); + _socket.connect(new InetSocketAddress(address, settings.getPort())); } catch (SocketException e) { @@ -81,36 +79,35 @@ public class IoNetworkTransport implements NetworkTransport, IoContext { throw new TransportException("Error connecting to broker", e); } - } - public void receiver(Receiver<ByteBuffer> delegate) - { - receiver = new IoReceiver(this, delegate, - 2*settings.getReadBufferSize() , timeout); - } - - public Sender<ByteBuffer> sender() - { - return new IoSender(this, 2*settings.getWriteBufferSize(), timeout); - } + try + { + _connection = new IoNetworkConnection(_socket, delegate, sendBufferSize, receiveBufferSize, _timeout); + } + catch(Exception e) + { + try + { + _socket.close(); + } + catch(IOException ioe) + { + //ignored, throw based on original exception + } - public void close() - { - - } + throw new TransportException("Error creating network connection", e); + } - public Sender<ByteBuffer> getSender() - { - return sender; + return _connection; } - public IoReceiver getReceiver() + public void close() { - return receiver; + _connection.close(); } - public Socket getSocket() + public NetworkConnection getConnection() { - return socket; + return _connection; } } diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java index 19a683d505..fea87fc350 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java +++ b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java @@ -20,6 +20,7 @@ */ package org.apache.qpid.transport.network.io; +import org.apache.qpid.common.Closeable; import org.apache.qpid.thread.Threading; import org.apache.qpid.transport.Receiver; import org.apache.qpid.transport.TransportException; @@ -37,43 +38,54 @@ import java.util.concurrent.atomic.AtomicBoolean; * */ -final class IoReceiver implements Runnable +final class IoReceiver implements Runnable, Closeable { private static final Logger log = Logger.get(IoReceiver.class); - private final IoContext ioCtx; private final Receiver<ByteBuffer> receiver; private final int bufferSize; private final Socket socket; private final long timeout; private final AtomicBoolean closed = new AtomicBoolean(false); private final Thread receiverThread; - private final boolean shutdownBroken = - ((String) System.getProperties().get("os.name")).matches("(?i).*windows.*"); + private static final boolean shutdownBroken; + static + { + String osName = System.getProperty("os.name"); + shutdownBroken = osName == null ? false : osName.matches("(?i).*windows.*"); + } - public IoReceiver(IoContext ioCtx, Receiver<ByteBuffer> receiver, - int bufferSize, long timeout) + public IoReceiver(Socket socket, Receiver<ByteBuffer> receiver, int bufferSize, long timeout) { - this.ioCtx = ioCtx; this.receiver = receiver; this.bufferSize = bufferSize; - this.socket = ioCtx.getSocket(); + this.socket = socket; this.timeout = timeout; try { + //Create but deliberately don't start the thread. receiverThread = Threading.getThreadFactory().createThread(this); } catch(Exception e) { - throw new Error("Error creating IOReceiver thread",e); + throw new RuntimeException("Error creating IOReceiver thread",e); } receiverThread.setDaemon(true); receiverThread.setName(String.format("IoReceiver - %s", socket.getRemoteSocketAddress())); + } + + public void initiate() + { receiverThread.start(); } + public void close() + { + close(false); + } + void close(boolean block) { if (!closed.getAndSet(true)) diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java index 66b97e8225..1bb515624c 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java +++ b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java @@ -24,8 +24,11 @@ import java.io.IOException; import java.io.OutputStream; import java.net.Socket; import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.qpid.common.Closeable; import org.apache.qpid.thread.Threading; import org.apache.qpid.transport.Sender; import org.apache.qpid.transport.SenderException; @@ -43,7 +46,6 @@ public final class IoSender implements Runnable, Sender<ByteBuffer> // we can test other cases as well private final static int START = Integer.MAX_VALUE - 10; - private final IoContext ioCtx; private final long timeout; private final Socket socket; private final OutputStream out; @@ -56,14 +58,13 @@ public final class IoSender implements Runnable, Sender<ByteBuffer> private final Object notEmpty = new Object(); private final AtomicBoolean closed = new AtomicBoolean(false); private final Thread senderThread; + private final List<Closeable> _listeners = new ArrayList<Closeable>(); private volatile Throwable exception = null; - - public IoSender(IoContext ioCtx, int bufferSize, long timeout) + public IoSender(Socket socket, int bufferSize, long timeout) { - this.ioCtx = ioCtx; - this.socket = ioCtx.getSocket(); + this.socket = socket; this.buffer = new byte[pof2(bufferSize)]; // buffer size must be a power of 2 this.timeout = timeout; @@ -78,6 +79,7 @@ public final class IoSender implements Runnable, Sender<ByteBuffer> try { + //Create but deliberately don't start the thread. senderThread = Threading.getThreadFactory().createThread(this); } catch(Exception e) @@ -87,6 +89,10 @@ public final class IoSender implements Runnable, Sender<ByteBuffer> senderThread.setDaemon(true); senderThread.setName(String.format("IoSender - %s", socket.getRemoteSocketAddress())); + } + + public void initiate() + { senderThread.start(); } @@ -204,16 +210,20 @@ public final class IoSender implements Runnable, Sender<ByteBuffer> senderThread.join(timeout); if (senderThread.isAlive()) { + log.error("join timed out"); throw new SenderException("join timed out"); } } - ioCtx.getReceiver().close(false); } catch (InterruptedException e) { + log.error("interrupted whilst waiting for sender thread to stop"); throw new SenderException(e); } - + finally + { + closeListeners(); + } if (reportException && exception != null) { throw new SenderException(exception); @@ -221,6 +231,28 @@ public final class IoSender implements Runnable, Sender<ByteBuffer> } } + private void closeListeners() + { + Exception ex = null; + for(Closeable listener : _listeners) + { + try + { + listener.close(); + } + catch(Exception e) + { + log.error("Exception closing listener: " + e.getMessage()); + ex = e; + } + } + + if (ex != null) + { + throw new SenderException(ex.getMessage(), ex); + } + } + public void run() { final int size = buffer.length; @@ -304,4 +336,9 @@ public final class IoSender implements Runnable, Sender<ByteBuffer> throw new SenderException(e); } } + + public void registerCloseListener(Closeable listener) + { + _listeners.add(listener); + } } diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java index f261111777..796a845593 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java +++ b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java @@ -42,7 +42,7 @@ import org.apache.qpid.transport.util.Logger; * SO_RCVBUF - amqj.receiveBufferSize * SO_SNDBUF - amqj.sendBufferSize */ -public final class IoTransport<E> implements IoContext +public final class IoTransport<E> { static @@ -70,44 +70,63 @@ public final class IoTransport<E> implements IoContext IoTransport(Socket socket, Binding<E,ByteBuffer> binding, boolean ssl) { this.socket = socket; - + if (ssl) { - SSLEngine engine = null; - SSLContext sslCtx; - try - { - sslCtx = createSSLContext(); - } - catch (Exception e) - { - throw new TransportException("Error creating SSL Context", e); - } - - try - { - engine = sslCtx.createSSLEngine(); - engine.setUseClientMode(true); - } - catch(Exception e) - { - throw new TransportException("Error creating SSL Engine", e); - } - - this.sender = new SSLSender(engine,new IoSender(this, 2*writeBufferSize, timeout)); - this.endpoint = binding.endpoint(sender); - this.receiver = new IoReceiver(this, new SSLReceiver(engine,binding.receiver(endpoint),(SSLSender)sender), - 2*readBufferSize, timeout); - - log.info("SSL Sender and Receiver initiated"); + setupSSLTransport(socket, binding); } else { - this.sender = new IoSender(this, 2*writeBufferSize, timeout); - this.endpoint = binding.endpoint(sender); - this.receiver = new IoReceiver(this, binding.receiver(endpoint), - 2*readBufferSize, timeout); + setupTransport(socket, binding); + } + } + + private void setupTransport(Socket socket, Binding<E, ByteBuffer> binding) + { + IoSender ios = new IoSender(socket, 2*writeBufferSize, timeout); + ios.initiate(); + + this.sender = ios; + this.endpoint = binding.endpoint(sender); + this.receiver = new IoReceiver(socket, binding.receiver(endpoint), + 2*readBufferSize, timeout); + this.receiver.initiate(); + + ios.registerCloseListener(this.receiver); + } + + private void setupSSLTransport(Socket socket, Binding<E, ByteBuffer> binding) + { + SSLEngine engine = null; + SSLContext sslCtx; + try + { + sslCtx = createSSLContext(); + } + catch (Exception e) + { + throw new TransportException("Error creating SSL Context", e); + } + + try + { + engine = sslCtx.createSSLEngine(); + engine.setUseClientMode(true); } + catch(Exception e) + { + throw new TransportException("Error creating SSL Engine", e); + } + IoSender ios = new IoSender(socket, 2*writeBufferSize, timeout); + ios.initiate(); + this.sender = new SSLSender(engine,ios); + this.endpoint = binding.endpoint(sender); + this.receiver = new IoReceiver(socket, new SSLReceiver(engine,binding.receiver(endpoint),(SSLSender)sender), + 2*readBufferSize, timeout); + this.receiver.initiate(); + ios.registerCloseListener(this.receiver); + + log.info("SSL Sender and Receiver initiated"); } public Sender<ByteBuffer> getSender() diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLSender.java b/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLSender.java index 27255f79f6..2d9e4e9a7e 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLSender.java +++ b/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLSender.java @@ -43,8 +43,7 @@ public class SASLSender extends SASLEncryptor implements Sender<ByteBuffer> { this.delegate = delegate; log.debug("SASL Sender enabled"); } - - @Override + public void close() { @@ -65,13 +64,11 @@ public class SASLSender extends SASLEncryptor implements Sender<ByteBuffer> { } } - @Override public void flush() { delegate.flush(); } - @Override public void send(ByteBuffer buf) { if (closed.get()) @@ -108,7 +105,6 @@ public class SASLSender extends SASLEncryptor implements Sender<ByteBuffer> { } } - @Override public void setIdleTimeout(int i) { delegate.setIdleTimeout(i); diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/QpidClientX509KeyManager.java b/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/QpidClientX509KeyManager.java index 14f28f8828..0dd86d4560 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/QpidClientX509KeyManager.java +++ b/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/QpidClientX509KeyManager.java @@ -48,51 +48,45 @@ public class QpidClientX509KeyManager extends X509ExtendedKeyManager kmf.init(ks, keyStorePassword.toCharArray()); this.delegate = (X509ExtendedKeyManager)kmf.getKeyManagers()[0]; } - - @Override + public String chooseClientAlias(String[] keyType, Principal[] issuers, Socket socket) { log.debug("chooseClientAlias:Returning alias " + alias); return alias; } - @Override public String chooseServerAlias(String keyType, Principal[] issuers, Socket socket) { return delegate.chooseServerAlias(keyType, issuers, socket); } - @Override public X509Certificate[] getCertificateChain(String alias) { return delegate.getCertificateChain(alias); } - @Override public String[] getClientAliases(String keyType, Principal[] issuers) { log.debug("getClientAliases:Returning alias " + alias); return new String[]{alias}; } - @Override public PrivateKey getPrivateKey(String alias) { return delegate.getPrivateKey(alias); } - @Override public String[] getServerAliases(String keyType, Principal[] issuers) { return delegate.getServerAliases(keyType, issuers); } - + public String chooseEngineClientAlias(String[] keyType, Principal[] issuers, SSLEngine engine) { log.debug("chooseEngineClientAlias:Returning alias " + alias); return alias; } - + public String chooseEngineServerAlias(String keyType, Principal[] issuers, SSLEngine engine) { return delegate.chooseEngineServerAlias(keyType, issuers, engine); diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoAcceptor.java b/java/common/src/test/java/org/apache/qpid/transport/network/io/IoAcceptor.java index 8530240dcc..8530240dcc 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoAcceptor.java +++ b/java/common/src/test/java/org/apache/qpid/transport/network/io/IoAcceptor.java |
