summaryrefslogtreecommitdiff
path: root/qpid/java/broker-core/src
diff options
context:
space:
mode:
authorKeith Wall <kwall@apache.org>2014-12-05 14:48:00 +0000
committerKeith Wall <kwall@apache.org>2014-12-05 14:48:00 +0000
commita0cdd525f55c8386e2b3e86cdd683c69d181c209 (patch)
treeee0f6d906cbcfe1b0d8a60dc3ee23f963252377f /qpid/java/broker-core/src
parenta8c5c3888feed40fd6c47a44f677668250e7635d (diff)
downloadqpid-python-a0cdd525f55c8386e2b3e86cdd683c69d181c209.tar.gz
QPID-6262: Rob's prototype NIO work
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/QPID-6262-JavaBrokerNIO@1643302 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker-core/src')
-rwxr-xr-xqpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java1
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/SSLBufferingSender.java281
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java6
3 files changed, 284 insertions, 4 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
index dd5e01ebc5..aa9f649995 100755
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
@@ -44,7 +44,6 @@ import org.apache.qpid.server.plugin.ProtocolEngineCreator;
import org.apache.qpid.transport.Sender;
import org.apache.qpid.transport.network.NetworkConnection;
import org.apache.qpid.transport.network.security.SSLStatus;
-import org.apache.qpid.transport.network.security.ssl.SSLBufferingSender;
import org.apache.qpid.transport.network.security.ssl.SSLReceiver;
import org.apache.qpid.transport.network.security.ssl.SSLUtil;
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/SSLBufferingSender.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/SSLBufferingSender.java
new file mode 100644
index 0000000000..09e8a68fb0
--- /dev/null
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/SSLBufferingSender.java
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicBoolean;
+import javax.net.ssl.SSLEngine;
+import javax.net.ssl.SSLEngineResult;
+import javax.net.ssl.SSLEngineResult.HandshakeStatus;
+import javax.net.ssl.SSLEngineResult.Status;
+import javax.net.ssl.SSLException;
+import org.apache.qpid.transport.Sender;
+import org.apache.qpid.transport.SenderException;
+import org.apache.qpid.transport.network.security.SSLStatus;
+import org.apache.qpid.transport.network.security.ssl.SSLUtil;
+import org.apache.qpid.transport.util.Logger;
+
+public class SSLBufferingSender implements Sender<ByteBuffer>
+{
+ private static final Logger LOGGER = Logger.get(SSLBufferingSender.class);
+ private static final ByteBuffer EMPTY_BYTE_BUFFER = ByteBuffer.allocate(0);
+
+ private final Sender<ByteBuffer> _delegate;
+ private final SSLEngine _engine;
+ private final int _sslBufSize;
+ private final SSLStatus _sslStatus;
+
+ private String _hostname;
+
+ private final AtomicBoolean _closed = new AtomicBoolean(false);
+ private ByteBuffer _appData = EMPTY_BYTE_BUFFER;
+
+
+ public SSLBufferingSender(SSLEngine engine, Sender<ByteBuffer> delegate, SSLStatus sslStatus)
+ {
+ _engine = engine;
+ _delegate = delegate;
+ _sslBufSize = engine.getSession().getPacketBufferSize();
+ _sslStatus = sslStatus;
+ }
+
+ public void setHostname(String hostname)
+ {
+ _hostname = hostname;
+ }
+
+ public void close()
+ {
+ if (!_closed.getAndSet(true))
+ {
+ if (_engine.isOutboundDone())
+ {
+ return;
+ }
+ LOGGER.debug("Closing SSL connection");
+ doSend();
+ _engine.closeOutbound();
+ try
+ {
+ tearDownSSLConnection();
+ }
+ catch(Exception e)
+ {
+ throw new SenderException("Error closing SSL connection",e);
+ }
+
+
+ synchronized(_sslStatus.getSslLock())
+ {
+ while (!_engine.isOutboundDone())
+ {
+ try
+ {
+ _sslStatus.getSslLock().wait();
+ }
+ catch(InterruptedException e)
+ {
+ // pass
+ }
+
+ }
+ }
+ _delegate.close();
+ }
+ }
+
+ private void tearDownSSLConnection() throws Exception
+ {
+ ByteBuffer netData = getNetDataBuffer();
+ SSLEngineResult result = _engine.wrap(ByteBuffer.allocate(0), netData);
+ Status status = result.getStatus();
+ int read = result.bytesProduced();
+ while (status != Status.CLOSED)
+ {
+ if (status == Status.BUFFER_OVERFLOW)
+ {
+ netData.clear();
+ }
+ if(read > 0)
+ {
+ int limit = netData.limit();
+ netData.limit(netData.position());
+ netData.position(netData.position() - read);
+
+ ByteBuffer data = netData.slice();
+
+ netData.limit(limit);
+ netData.position(netData.position() + read);
+
+ _delegate.send(data);
+ flush();
+ }
+ result = _engine.wrap(ByteBuffer.allocate(0), netData);
+ status = result.getStatus();
+ read = result.bytesProduced();
+ }
+ }
+
+ private ByteBuffer getNetDataBuffer()
+ {
+ return ByteBuffer.allocate(_sslBufSize);
+ }
+
+ public void flush()
+ {
+ _delegate.flush();
+ }
+
+ public void send()
+ {
+ if(!_closed.get())
+ {
+ doSend();
+ }
+ }
+
+ public synchronized void send(ByteBuffer appData)
+ {
+ boolean buffered;
+ if(buffered = _appData.hasRemaining())
+ {
+ ByteBuffer newBuf = ByteBuffer.allocate(_appData.remaining()+appData.remaining());
+ newBuf.put(_appData);
+ newBuf.put(appData);
+ newBuf.flip();
+ _appData = newBuf;
+ }
+ if (_closed.get())
+ {
+ throw new SenderException("SSL Sender is closed");
+ }
+ doSend();
+ if(!appData.hasRemaining())
+ {
+ _appData = EMPTY_BYTE_BUFFER;
+ }
+ else if(!buffered)
+ {
+ _appData = ByteBuffer.allocate(appData.remaining());
+ _appData.put(appData);
+ _appData.flip();
+ }
+ }
+
+ private synchronized void doSend()
+ {
+
+ HandshakeStatus handshakeStatus;
+ Status status;
+
+ while((_appData.hasRemaining() || _engine.getHandshakeStatus() == HandshakeStatus.NEED_WRAP)
+ && !_sslStatus.getSslErrorFlag())
+ {
+ ByteBuffer netData = getNetDataBuffer();
+
+ int read = 0;
+ try
+ {
+ SSLEngineResult result = _engine.wrap(_appData, netData);
+ read = result.bytesProduced();
+ status = result.getStatus();
+ handshakeStatus = result.getHandshakeStatus();
+ }
+ catch(SSLException e)
+ {
+ // Should this set _sslError??
+ throw new SenderException("SSL, Error occurred while encrypting data",e);
+ }
+
+ if(read > 0)
+ {
+ int limit = netData.limit();
+ netData.limit(netData.position());
+ netData.position(netData.position() - read);
+
+ ByteBuffer data = netData.slice();
+
+ netData.limit(limit);
+ netData.position(netData.position() + read);
+
+ _delegate.send(data);
+ }
+
+ switch(status)
+ {
+ case CLOSED:
+ throw new SenderException("SSLEngine is closed");
+
+ case BUFFER_OVERFLOW:
+ netData.clear();
+ continue;
+
+ case OK:
+ break; // do nothing
+
+ default:
+ throw new IllegalStateException("SSLReceiver: Invalid State " + status);
+ }
+
+ switch (handshakeStatus)
+ {
+ case NEED_WRAP:
+ if (netData.hasRemaining())
+ {
+ continue;
+ }
+
+ case NEED_TASK:
+ doTasks();
+ break;
+
+ case NEED_UNWRAP:
+ flush();
+ return;
+
+ case FINISHED:
+ if (_hostname != null)
+ {
+ SSLUtil.verifyHostname(_engine, _hostname);
+ }
+
+ case NOT_HANDSHAKING:
+ break; //do nothing
+
+ default:
+ throw new IllegalStateException("SSLSender: Invalid State " + status);
+ }
+
+ }
+ }
+
+ private void doTasks()
+ {
+ Runnable runnable;
+ while ((runnable = _engine.getDelegatedTask()) != null) {
+ runnable.run();
+ }
+ }
+
+ public void setIdleTimeout(int i)
+ {
+ _delegate.setIdleTimeout(i);
+ }
+}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java
index b1f6b84b72..d3a984c4f0 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java
@@ -28,13 +28,13 @@ import java.util.Set;
import javax.net.ssl.SSLContext;
import org.apache.qpid.server.model.Broker;
-import org.apache.qpid.server.model.Port;
import org.apache.qpid.server.model.Protocol;
import org.apache.qpid.server.model.Transport;
import org.apache.qpid.server.model.port.AmqpPort;
import org.apache.qpid.server.protocol.MultiVersionProtocolEngineFactory;
import org.apache.qpid.transport.NetworkTransportConfiguration;
import org.apache.qpid.transport.network.IncomingNetworkTransport;
+import org.apache.qpid.transport.network.io.NonBlockingNetworkTransport;
class TCPandSSLTransport implements AcceptingTransport
{
@@ -78,10 +78,10 @@ class TCPandSSLTransport implements AcceptingTransport
}
final NetworkTransportConfiguration settings = new ServerNetworkTransportConfiguration();
- _networkTransport = org.apache.qpid.transport.network.Transport.getIncomingTransportInstance();
+ _networkTransport = new NonBlockingNetworkTransport();
final MultiVersionProtocolEngineFactory protocolEngineFactory =
new MultiVersionProtocolEngineFactory(
- _port.getParent(Broker.class), _transports.contains(Transport.TCP) ? _sslContext : null,
+ _port.getParent(Broker.class), _sslContext,
settings.wantClientAuth(), settings.needClientAuth(),
_supported,
_defaultSupportedProtocolReply,