summaryrefslogtreecommitdiff
path: root/qpid/java/amqp-1-0-client-websocket
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2014-07-22 12:18:33 +0000
committerRobert Godfrey <rgodfrey@apache.org>2014-07-22 12:18:33 +0000
commitcf4fa14c7827f3f464a2edb99629f091b00947a2 (patch)
treebaae3ab68e1a36d3fe2dec1de9ff09df01916822 /qpid/java/amqp-1-0-client-websocket
parent6738190a2b499f35dd7a7329a2c8d9cd384ab752 (diff)
downloadqpid-python-cf4fa14c7827f3f464a2edb99629f091b00947a2.tar.gz
QPID-5576 : Detect closure of sockets better and do not leave threads waiting for input which will never come
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1612555 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/amqp-1-0-client-websocket')
-rw-r--r--qpid/java/amqp-1-0-client-websocket/src/main/java/org/apache/qpid/amqp_1_0/client/websocket/WebSocketProvider.java42
1 files changed, 28 insertions, 14 deletions
diff --git a/qpid/java/amqp-1-0-client-websocket/src/main/java/org/apache/qpid/amqp_1_0/client/websocket/WebSocketProvider.java b/qpid/java/amqp-1-0-client-websocket/src/main/java/org/apache/qpid/amqp_1_0/client/websocket/WebSocketProvider.java
index 1805b593f1..cb1701b2fb 100644
--- a/qpid/java/amqp-1-0-client-websocket/src/main/java/org/apache/qpid/amqp_1_0/client/websocket/WebSocketProvider.java
+++ b/qpid/java/amqp-1-0-client-websocket/src/main/java/org/apache/qpid/amqp_1_0/client/websocket/WebSocketProvider.java
@@ -20,6 +20,18 @@
*/
package org.apache.qpid.amqp_1_0.client.websocket;
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.util.concurrent.TimeUnit;
+
+import javax.net.ssl.SSLContext;
+
+import org.eclipse.jetty.util.ssl.SslContextFactory;
+import org.eclipse.jetty.util.thread.QueuedThreadPool;
+import org.eclipse.jetty.websocket.WebSocket;
+import org.eclipse.jetty.websocket.WebSocketClient;
+import org.eclipse.jetty.websocket.WebSocketClientFactory;
+
import org.apache.qpid.amqp_1_0.client.ConnectionException;
import org.apache.qpid.amqp_1_0.client.TransportProvider;
import org.apache.qpid.amqp_1_0.codec.FrameWriter;
@@ -29,16 +41,6 @@ import org.apache.qpid.amqp_1_0.framing.ExceptionHandler;
import org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint;
import org.apache.qpid.amqp_1_0.type.FrameBody;
import org.apache.qpid.amqp_1_0.type.SaslFrameBody;
-import org.eclipse.jetty.util.ssl.SslContextFactory;
-import org.eclipse.jetty.util.thread.QueuedThreadPool;
-import org.eclipse.jetty.websocket.WebSocket;
-import org.eclipse.jetty.websocket.WebSocketClient;
-import org.eclipse.jetty.websocket.WebSocketClientFactory;
-
-import javax.net.ssl.SSLContext;
-import java.net.URI;
-import java.nio.ByteBuffer;
-import java.util.concurrent.TimeUnit;
class WebSocketProvider implements TransportProvider
{
@@ -49,6 +51,7 @@ class WebSocketProvider implements TransportProvider
private static QueuedThreadPool _threadPool;
private final String _transport;
private static WebSocketClientFactory _factory;
+ private WebSocket.Connection _connection;
public WebSocketProvider(final String transport)
{
@@ -134,7 +137,7 @@ class WebSocketProvider implements TransportProvider
(byte)1,
(byte)0,
(byte)0),
- saslOut,
+ saslOut.asFrameSource(),
new HeaderFrameSource((byte)'A',
(byte)'M',
(byte)'Q',
@@ -143,7 +146,7 @@ class WebSocketProvider implements TransportProvider
(byte)1,
(byte)0,
(byte)0),
- out);
+ out.asFrameSource());
conn.setSaslFrameOutput(saslOut);
}
@@ -157,13 +160,13 @@ class WebSocketProvider implements TransportProvider
(byte)1,
(byte)0,
(byte)0),
- out);
+ out.asFrameSource());
}
final ConnectionHandler handler = new ConnectionHandler(conn);
conn.setFrameOutputHandler(out);
final URI uri = new URI(_transport +"://"+ address+":"+ port +"/");
- WebSocket.Connection connection = client.open(uri, new WebSocket.OnBinaryMessage()
+ _connection = client.open(uri, new WebSocket.OnBinaryMessage()
{
public void onOpen(Connection connection)
{
@@ -192,6 +195,11 @@ class WebSocketProvider implements TransportProvider
}
+ @Override
+ public void close()
+ {
+ _connection.close();
+ }
public static class HeaderFrameSource implements ConnectionHandler.FrameSource
@@ -225,6 +233,12 @@ class WebSocketProvider implements TransportProvider
return _closed;
}
+ @Override
+ public void close()
+ {
+ _closed = true;
+ }
+
}