summaryrefslogtreecommitdiff
path: root/qpid/java/amqp-1-0-client-websocket/src
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2014-01-14 08:58:03 +0000
committerRobert Godfrey <rgodfrey@apache.org>2014-01-14 08:58:03 +0000
commit6dfbb79c9ce4835cc744c466e41bb1b42cee81c1 (patch)
treef341296e462617b4a80ec0e192f80224a438c405 /qpid/java/amqp-1-0-client-websocket/src
parentc9660933637b69a14ae870397a53b086a8d6ab85 (diff)
downloadqpid-python-6dfbb79c9ce4835cc744c466e41bb1b42cee81c1.tar.gz
QPID-5459 : Added configurable TLS parameters for AMQP 1.0 client (both TCP and WSS)
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1557982 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/amqp-1-0-client-websocket/src')
-rw-r--r--qpid/java/amqp-1-0-client-websocket/src/main/java/org/apache/qpid/amqp_1_0/client/websocket/WebSocketProvider.java68
1 files changed, 50 insertions, 18 deletions
diff --git a/qpid/java/amqp-1-0-client-websocket/src/main/java/org/apache/qpid/amqp_1_0/client/websocket/WebSocketProvider.java b/qpid/java/amqp-1-0-client-websocket/src/main/java/org/apache/qpid/amqp_1_0/client/websocket/WebSocketProvider.java
index 6c35e555ca..1805b593f1 100644
--- a/qpid/java/amqp-1-0-client-websocket/src/main/java/org/apache/qpid/amqp_1_0/client/websocket/WebSocketProvider.java
+++ b/qpid/java/amqp-1-0-client-websocket/src/main/java/org/apache/qpid/amqp_1_0/client/websocket/WebSocketProvider.java
@@ -27,14 +27,15 @@ import org.apache.qpid.amqp_1_0.framing.AMQFrame;
import org.apache.qpid.amqp_1_0.framing.ConnectionHandler;
import org.apache.qpid.amqp_1_0.framing.ExceptionHandler;
import org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint;
-import org.apache.qpid.amqp_1_0.type.Binary;
import org.apache.qpid.amqp_1_0.type.FrameBody;
import org.apache.qpid.amqp_1_0.type.SaslFrameBody;
+import org.eclipse.jetty.util.ssl.SslContextFactory;
+import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.eclipse.jetty.websocket.WebSocket;
import org.eclipse.jetty.websocket.WebSocketClient;
import org.eclipse.jetty.websocket.WebSocketClientFactory;
-import java.io.IOException;
+import javax.net.ssl.SSLContext;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.concurrent.TimeUnit;
@@ -45,6 +46,7 @@ class WebSocketProvider implements TransportProvider
private static final byte AMQP_HEADER_FRAME_TYPE = (byte) 222;
private static int _connections;
+ private static QueuedThreadPool _threadPool;
private final String _transport;
private static WebSocketClientFactory _factory;
@@ -53,23 +55,51 @@ class WebSocketProvider implements TransportProvider
_transport = transport;
}
- private static synchronized WebSocketClient createWebSocketClient() throws Exception
+ private static synchronized WebSocketClientFactory getWebSocketClientFactory(SSLContext context) throws Exception
{
- if(_factory == null)
+ if(_threadPool == null)
{
- _factory = new WebSocketClientFactory();
- _factory.start();
+ _threadPool = new QueuedThreadPool();
+ }
+ if(context != null)
+ {
+ WebSocketClientFactory factory = new WebSocketClientFactory(_threadPool);
+ SslContextFactory sslContextFactory = factory.getSslContextFactory();
+
+
+ sslContextFactory.setSslContext(context);
+
+ factory.start();
+
+ return factory;
+ }
+ else
+ {
+ if(_factory == null)
+ {
+ _factory = new WebSocketClientFactory(_threadPool);
+ _factory.start();
+ }
+ _connections++;
+ return _factory;
}
- _connections++;
- return _factory.newWebSocketClient();
}
- private static synchronized void removeClient() throws Exception
+
+ private static synchronized void removeClient(final WebSocketClientFactory factory) throws Exception
{
- if(--_connections == 0)
+
+ if(factory == _factory)
+ {
+ if(--_connections == 0)
+ {
+ _factory.stop();
+ _factory = null;
+ }
+ }
+ else
{
- _factory.stop();
- _factory = null;
+ factory.stop();
}
}
@@ -77,13 +107,13 @@ class WebSocketProvider implements TransportProvider
public void connect(final ConnectionEndpoint conn,
final String address,
final int port,
- final boolean ssl,
- final ExceptionHandler exceptionHandler) throws ConnectionException
+ final SSLContext sslContext, final ExceptionHandler exceptionHandler) throws ConnectionException
{
try
{
- WebSocketClient client = createWebSocketClient();
+ final WebSocketClientFactory webSocketClientFactory = getWebSocketClientFactory(sslContext);
+ WebSocketClient client = webSocketClientFactory.newWebSocketClient();
// Configure the client
client.setProtocol(AMQP_WEBSOCKET_SUBPROTOCOL);
@@ -138,7 +168,7 @@ class WebSocketProvider implements TransportProvider
public void onOpen(Connection connection)
{
- Thread outputThread = new Thread(new FrameOutputThread(connection, src, conn, exceptionHandler));
+ Thread outputThread = new Thread(new FrameOutputThread(connection, src, conn, exceptionHandler, webSocketClientFactory));
outputThread.setDaemon(true);
outputThread.start();
}
@@ -226,17 +256,19 @@ class WebSocketProvider implements TransportProvider
private final ExceptionHandler _exceptionHandler;
private final FrameWriter _frameWriter;
private final byte[] _buffer;
+ private final WebSocketClientFactory _factory;
public FrameOutputThread(final WebSocket.Connection connection,
final ConnectionHandler.FrameSource src,
final ConnectionEndpoint conn,
- final ExceptionHandler exceptionHandler)
+ final ExceptionHandler exceptionHandler, final WebSocketClientFactory factory)
{
_connection = connection;
_frameSource = src;
_exceptionHandler = exceptionHandler;
_frameWriter = new FrameWriter(conn.getDescribedTypeRegistry());
_buffer = new byte[conn.getMaxFrameSize()];
+ _factory = factory;
}
@Override
@@ -278,7 +310,7 @@ class WebSocketProvider implements TransportProvider
{
try
{
- removeClient();
+ removeClient(_factory);
}
catch (Exception e)
{