summaryrefslogtreecommitdiff
path: root/qpid/java
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2014-07-22 12:18:33 +0000
committerRobert Godfrey <rgodfrey@apache.org>2014-07-22 12:18:33 +0000
commitcf4fa14c7827f3f464a2edb99629f091b00947a2 (patch)
treebaae3ab68e1a36d3fe2dec1de9ff09df01916822 /qpid/java
parent6738190a2b499f35dd7a7329a2c8d9cd384ab752 (diff)
downloadqpid-python-cf4fa14c7827f3f464a2edb99629f091b00947a2.tar.gz
QPID-5576 : Detect closure of sockets better and do not leave threads waiting for input which will never come
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1612555 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
-rw-r--r--qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java7
-rw-r--r--qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java7
-rw-r--r--qpid/java/amqp-1-0-client-websocket/src/main/java/org/apache/qpid/amqp_1_0/client/websocket/WebSocketProvider.java42
-rw-r--r--qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Connection.java32
-rw-r--r--qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java36
-rw-r--r--qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Sender.java31
-rw-r--r--qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/TCPTransportProvier.java58
-rw-r--r--qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/TransportProvider.java8
-rw-r--r--qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/ConnectionHandler.java102
-rw-r--r--qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java33
10 files changed, 255 insertions, 101 deletions
diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java
index d7bb546d7a..1a72e129e7 100644
--- a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java
+++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java
@@ -239,7 +239,12 @@ public class MessageConsumerImpl implements MessageConsumer, QueueReceiver, Topi
public MessageImpl receive() throws JMSException
{
checkClosed();
- return receiveImpl(-1L);
+ MessageImpl message = receiveImpl(-1L);
+ if(message == null)
+ {
+ throw new JMSException("Message could not be retrieved");
+ }
+ return message;
}
public MessageImpl receive(final long timeout) throws JMSException
diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java
index bd67ff681a..0962e4aa37 100644
--- a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java
+++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java
@@ -891,9 +891,10 @@ public class SessionImpl implements Session, QueueSession, TopicSession
{
synchronized(getLock())
{
- while(!_closed)
+
+ while(!(_closed || getClientSession().getEndpoint().isEnded()))
{
- while(!_closed && (!_started || (_recoveredMessage == null && _messageConsumerList.isEmpty())))
+ while(!(_closed || getClientSession().getEndpoint().isEnded()) && (!_started || (_recoveredMessage == null && _messageConsumerList.isEmpty())))
{
try
{
@@ -904,7 +905,7 @@ public class SessionImpl implements Session, QueueSession, TopicSession
return;
}
}
- while(!_closed && (_started && (_recoveredMessage != null || !_messageConsumerList.isEmpty())))
+ while(!(_closed || getClientSession().getEndpoint().isEnded()) && (_started && (_recoveredMessage != null || !_messageConsumerList.isEmpty())))
{
Message msg;
diff --git a/qpid/java/amqp-1-0-client-websocket/src/main/java/org/apache/qpid/amqp_1_0/client/websocket/WebSocketProvider.java b/qpid/java/amqp-1-0-client-websocket/src/main/java/org/apache/qpid/amqp_1_0/client/websocket/WebSocketProvider.java
index 1805b593f1..cb1701b2fb 100644
--- a/qpid/java/amqp-1-0-client-websocket/src/main/java/org/apache/qpid/amqp_1_0/client/websocket/WebSocketProvider.java
+++ b/qpid/java/amqp-1-0-client-websocket/src/main/java/org/apache/qpid/amqp_1_0/client/websocket/WebSocketProvider.java
@@ -20,6 +20,18 @@
*/
package org.apache.qpid.amqp_1_0.client.websocket;
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.util.concurrent.TimeUnit;
+
+import javax.net.ssl.SSLContext;
+
+import org.eclipse.jetty.util.ssl.SslContextFactory;
+import org.eclipse.jetty.util.thread.QueuedThreadPool;
+import org.eclipse.jetty.websocket.WebSocket;
+import org.eclipse.jetty.websocket.WebSocketClient;
+import org.eclipse.jetty.websocket.WebSocketClientFactory;
+
import org.apache.qpid.amqp_1_0.client.ConnectionException;
import org.apache.qpid.amqp_1_0.client.TransportProvider;
import org.apache.qpid.amqp_1_0.codec.FrameWriter;
@@ -29,16 +41,6 @@ import org.apache.qpid.amqp_1_0.framing.ExceptionHandler;
import org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint;
import org.apache.qpid.amqp_1_0.type.FrameBody;
import org.apache.qpid.amqp_1_0.type.SaslFrameBody;
-import org.eclipse.jetty.util.ssl.SslContextFactory;
-import org.eclipse.jetty.util.thread.QueuedThreadPool;
-import org.eclipse.jetty.websocket.WebSocket;
-import org.eclipse.jetty.websocket.WebSocketClient;
-import org.eclipse.jetty.websocket.WebSocketClientFactory;
-
-import javax.net.ssl.SSLContext;
-import java.net.URI;
-import java.nio.ByteBuffer;
-import java.util.concurrent.TimeUnit;
class WebSocketProvider implements TransportProvider
{
@@ -49,6 +51,7 @@ class WebSocketProvider implements TransportProvider
private static QueuedThreadPool _threadPool;
private final String _transport;
private static WebSocketClientFactory _factory;
+ private WebSocket.Connection _connection;
public WebSocketProvider(final String transport)
{
@@ -134,7 +137,7 @@ class WebSocketProvider implements TransportProvider
(byte)1,
(byte)0,
(byte)0),
- saslOut,
+ saslOut.asFrameSource(),
new HeaderFrameSource((byte)'A',
(byte)'M',
(byte)'Q',
@@ -143,7 +146,7 @@ class WebSocketProvider implements TransportProvider
(byte)1,
(byte)0,
(byte)0),
- out);
+ out.asFrameSource());
conn.setSaslFrameOutput(saslOut);
}
@@ -157,13 +160,13 @@ class WebSocketProvider implements TransportProvider
(byte)1,
(byte)0,
(byte)0),
- out);
+ out.asFrameSource());
}
final ConnectionHandler handler = new ConnectionHandler(conn);
conn.setFrameOutputHandler(out);
final URI uri = new URI(_transport +"://"+ address+":"+ port +"/");
- WebSocket.Connection connection = client.open(uri, new WebSocket.OnBinaryMessage()
+ _connection = client.open(uri, new WebSocket.OnBinaryMessage()
{
public void onOpen(Connection connection)
{
@@ -192,6 +195,11 @@ class WebSocketProvider implements TransportProvider
}
+ @Override
+ public void close()
+ {
+ _connection.close();
+ }
public static class HeaderFrameSource implements ConnectionHandler.FrameSource
@@ -225,6 +233,12 @@ class WebSocketProvider implements TransportProvider
return _closed;
}
+ @Override
+ public void close()
+ {
+ _closed = true;
+ }
+
}
diff --git a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Connection.java b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Connection.java
index 6157ec53f6..9319d4ddff 100644
--- a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Connection.java
+++ b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Connection.java
@@ -25,8 +25,10 @@ import java.security.Principal;
import java.util.ServiceLoader;
import java.util.concurrent.TimeoutException;
-import org.apache.qpid.amqp_1_0.framing.ExceptionHandler;
+import javax.net.ssl.SSLContext;
+
import org.apache.qpid.amqp_1_0.framing.ConnectionHandler;
+import org.apache.qpid.amqp_1_0.framing.ExceptionHandler;
import org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint;
import org.apache.qpid.amqp_1_0.transport.Container;
import org.apache.qpid.amqp_1_0.transport.Predicate;
@@ -37,8 +39,6 @@ import org.apache.qpid.amqp_1_0.type.transport.AmqpError;
import org.apache.qpid.amqp_1_0.type.transport.ConnectionError;
import org.apache.qpid.amqp_1_0.type.transport.Error;
-import javax.net.ssl.SSLContext;
-
public class Connection implements ExceptionHandler
{
private static final int MAX_FRAME_SIZE = 65536;
@@ -225,7 +225,7 @@ public class Connection implements ExceptionHandler
(byte)1,
(byte)0,
(byte)0),
- new ConnectionHandler.FrameToBytesSourceAdapter(saslOut,_conn.getDescribedTypeRegistry()),
+ new ConnectionHandler.FrameToBytesSourceAdapter(saslOut.asFrameSource(),_conn.getDescribedTypeRegistry()),
new ConnectionHandler.HeaderBytesSource(_conn, (byte)'A',
(byte)'M',
(byte)'Q',
@@ -234,7 +234,7 @@ public class Connection implements ExceptionHandler
(byte)1,
(byte)0,
(byte)0),
- new ConnectionHandler.FrameToBytesSourceAdapter(out,_conn.getDescribedTypeRegistry())
+ new ConnectionHandler.FrameToBytesSourceAdapter(out.asFrameSource(),_conn.getDescribedTypeRegistry())
);
_conn.setSaslFrameOutput(saslOut);
@@ -249,7 +249,7 @@ public class Connection implements ExceptionHandler
(byte)1,
(byte)0,
(byte)0),
- new ConnectionHandler.FrameToBytesSourceAdapter(out,_conn.getDescribedTypeRegistry())
+ new ConnectionHandler.FrameToBytesSourceAdapter(out.asFrameSource(),_conn.getDescribedTypeRegistry())
);
}
@@ -258,7 +258,14 @@ public class Connection implements ExceptionHandler
transportProvider.connect(_conn,address,port, sslContext, this);
- _conn.open();
+ try
+ {
+ _conn.open();
+ }
+ catch(RuntimeException e)
+ {
+ transportProvider.close();
+ }
}
@@ -295,7 +302,14 @@ public class Connection implements ExceptionHandler
{
if(getEndpoint().isClosed())
{
- throw new ConnectionClosedException(getEndpoint().getRemoteError());
+ Error remoteError = getEndpoint().getRemoteError();
+ if(remoteError == null)
+ {
+ remoteError = new Error();
+ remoteError.setDescription("Connection closed for unknown reason");
+
+ }
+ throw new ConnectionClosedException(remoteError);
}
}
@@ -377,7 +391,7 @@ public class Connection implements ExceptionHandler
if(_connectionErrorTask != null)
{
Thread thread = new Thread(_connectionErrorTask);
- thread.run();
+ thread.start();
}
}
}
diff --git a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java
index ad2924c01e..a2a15779d2 100644
--- a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java
+++ b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java
@@ -20,26 +20,41 @@
*/
package org.apache.qpid.amqp_1_0.client;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.TimeoutException;
+
import org.apache.qpid.amqp_1_0.messaging.SectionDecoder;
import org.apache.qpid.amqp_1_0.transport.DeliveryStateHandler;
import org.apache.qpid.amqp_1_0.transport.LinkEndpoint;
import org.apache.qpid.amqp_1_0.transport.Predicate;
import org.apache.qpid.amqp_1_0.transport.ReceivingLinkEndpoint;
import org.apache.qpid.amqp_1_0.transport.ReceivingLinkListener;
-
-import org.apache.qpid.amqp_1_0.type.*;
+import org.apache.qpid.amqp_1_0.type.AmqpErrorException;
+import org.apache.qpid.amqp_1_0.type.Binary;
import org.apache.qpid.amqp_1_0.type.DeliveryState;
-import org.apache.qpid.amqp_1_0.type.messaging.*;
+import org.apache.qpid.amqp_1_0.type.Outcome;
+import org.apache.qpid.amqp_1_0.type.Section;
+import org.apache.qpid.amqp_1_0.type.UnsignedInteger;
+import org.apache.qpid.amqp_1_0.type.messaging.Accepted;
+import org.apache.qpid.amqp_1_0.type.messaging.Modified;
+import org.apache.qpid.amqp_1_0.type.messaging.Released;
import org.apache.qpid.amqp_1_0.type.messaging.Source;
import org.apache.qpid.amqp_1_0.type.messaging.Target;
+import org.apache.qpid.amqp_1_0.type.messaging.TerminusDurability;
+import org.apache.qpid.amqp_1_0.type.messaging.TerminusExpiryPolicy;
import org.apache.qpid.amqp_1_0.type.transaction.TransactionalState;
-import org.apache.qpid.amqp_1_0.type.transport.*;
+import org.apache.qpid.amqp_1_0.type.transport.AmqpError;
+import org.apache.qpid.amqp_1_0.type.transport.Detach;
import org.apache.qpid.amqp_1_0.type.transport.Error;
-
-import java.nio.ByteBuffer;
-import java.util.*;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.TimeoutException;
+import org.apache.qpid.amqp_1_0.type.transport.ReceiverSettleMode;
+import org.apache.qpid.amqp_1_0.type.transport.SenderSettleMode;
+import org.apache.qpid.amqp_1_0.type.transport.Transfer;
public class Receiver implements DeliveryStateHandler
{
@@ -193,7 +208,8 @@ public class Receiver implements DeliveryStateHandler
{
if(_remoteErrorTask != null)
{
- _remoteErrorTask.run();
+ Thread thread = new Thread(_remoteErrorTask);
+ thread.start();
}
}
diff --git a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Sender.java b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Sender.java
index 1addad2235..adeab4ab5d 100644
--- a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Sender.java
+++ b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Sender.java
@@ -20,6 +20,13 @@
*/
package org.apache.qpid.amqp_1_0.client;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeoutException;
+
import org.apache.qpid.amqp_1_0.codec.DescribedTypeConstructor;
import org.apache.qpid.amqp_1_0.messaging.SectionEncoder;
import org.apache.qpid.amqp_1_0.transport.DeliveryStateHandler;
@@ -27,22 +34,21 @@ import org.apache.qpid.amqp_1_0.transport.LinkEndpoint;
import org.apache.qpid.amqp_1_0.transport.Predicate;
import org.apache.qpid.amqp_1_0.transport.SendingLinkEndpoint;
import org.apache.qpid.amqp_1_0.transport.SendingLinkListener;
-import org.apache.qpid.amqp_1_0.type.*;
+import org.apache.qpid.amqp_1_0.type.Binary;
+import org.apache.qpid.amqp_1_0.type.DeliveryState;
+import org.apache.qpid.amqp_1_0.type.Outcome;
+import org.apache.qpid.amqp_1_0.type.Section;
import org.apache.qpid.amqp_1_0.type.Source;
import org.apache.qpid.amqp_1_0.type.Target;
import org.apache.qpid.amqp_1_0.type.codec.AMQPDescribedTypeRegistry;
-import org.apache.qpid.amqp_1_0.type.messaging.*;
+import org.apache.qpid.amqp_1_0.type.messaging.Accepted;
+import org.apache.qpid.amqp_1_0.type.messaging.TerminusDurability;
+import org.apache.qpid.amqp_1_0.type.messaging.TerminusExpiryPolicy;
import org.apache.qpid.amqp_1_0.type.transaction.TransactionalState;
-import org.apache.qpid.amqp_1_0.type.transport.*;
-
-import java.nio.ByteBuffer;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.TimeoutException;
-
+import org.apache.qpid.amqp_1_0.type.transport.Detach;
import org.apache.qpid.amqp_1_0.type.transport.Error;
+import org.apache.qpid.amqp_1_0.type.transport.SenderSettleMode;
+import org.apache.qpid.amqp_1_0.type.transport.Transfer;
public class Sender implements DeliveryStateHandler
{
@@ -488,7 +494,8 @@ public class Sender implements DeliveryStateHandler
{
if(_remoteErrorTask != null)
{
- _remoteErrorTask.run();
+ Thread thread = new Thread(_remoteErrorTask);
+ thread.start();
}
}
diff --git a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/TCPTransportProvier.java b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/TCPTransportProvier.java
index ee515c33ef..da084bdc7b 100644
--- a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/TCPTransportProvier.java
+++ b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/TCPTransportProvier.java
@@ -26,6 +26,9 @@ import java.io.OutputStream;
import java.net.Socket;
import java.net.SocketTimeoutException;
import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.logging.Level;
+import java.util.logging.Logger;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLSocket;
@@ -39,6 +42,9 @@ import org.apache.qpid.amqp_1_0.type.SaslFrameBody;
class TCPTransportProvier implements TransportProvider
{
+ private static final Logger RAW_LOGGER = Logger.getLogger("RAW");
+
+ private Socket _socket;
private final String _transport;
// Defines read socket timeout in milliseconds. A value of 0 means that the socket
@@ -49,6 +55,7 @@ class TCPTransportProvier implements TransportProvider
// the event of a SocketTimeoutException. A value of -1L will disable idle read timeout checking.
// Default value is set to -1L, which means disable idle read checks.
private long _readIdleTimeout = Long.getLong("qpid.connection_read_idle_timeout", -1L);
+ private final AtomicLong _threadNameIndex = new AtomicLong();
public TCPTransportProvier(final String transport)
{
@@ -64,7 +71,6 @@ class TCPTransportProvier implements TransportProvider
{
try
{
- final Socket s;
if(sslContext != null)
{
final SSLSocketFactory socketFactory = sslContext.getSocketFactory();
@@ -72,16 +78,16 @@ class TCPTransportProvier implements TransportProvider
SSLSocket sslSocket = (SSLSocket) socketFactory.createSocket(address, port);
conn.setExternalPrincipal(sslSocket.getSession().getLocalPrincipal());
- s=sslSocket;
+ _socket=sslSocket;
}
else
{
- s = new Socket(address, port);
+ _socket = new Socket(address, port);
}
// set socket read timeout
- s.setSoTimeout(_readTimeout);
+ _socket.setSoTimeout(_readTimeout);
- conn.setRemoteAddress(s.getRemoteSocketAddress());
+ conn.setRemoteAddress(_socket.getRemoteSocketAddress());
ConnectionHandler.FrameOutput<FrameBody> out = new ConnectionHandler.FrameOutput<FrameBody>(conn);
@@ -99,7 +105,7 @@ class TCPTransportProvier implements TransportProvider
(byte)1,
(byte)0,
(byte)0),
- new ConnectionHandler.FrameToBytesSourceAdapter(saslOut,conn.getDescribedTypeRegistry()),
+ new ConnectionHandler.FrameToBytesSourceAdapter(saslOut.asFrameSource(),conn.getDescribedTypeRegistry()),
new ConnectionHandler.HeaderBytesSource(conn, (byte)'A',
(byte)'M',
(byte)'Q',
@@ -108,7 +114,7 @@ class TCPTransportProvier implements TransportProvider
(byte)1,
(byte)0,
(byte)0),
- new ConnectionHandler.FrameToBytesSourceAdapter(out,conn.getDescribedTypeRegistry())
+ new ConnectionHandler.FrameToBytesSourceAdapter(out.asFrameSource(),conn.getDescribedTypeRegistry())
);
conn.setSaslFrameOutput(saslOut);
@@ -123,22 +129,24 @@ class TCPTransportProvier implements TransportProvider
(byte)1,
(byte)0,
(byte)0),
- new ConnectionHandler.FrameToBytesSourceAdapter(out,conn.getDescribedTypeRegistry())
+ new ConnectionHandler.FrameToBytesSourceAdapter(out.asFrameSource(),conn.getDescribedTypeRegistry())
);
}
- final OutputStream outputStream = s.getOutputStream();
+ final OutputStream outputStream = _socket.getOutputStream();
ConnectionHandler.BytesOutputHandler outputHandler =
new ConnectionHandler.BytesOutputHandler(outputStream, src, conn, exceptionHandler);
- Thread outputThread = new Thread(outputHandler);
+ long threadIndex = _threadNameIndex.getAndIncrement();
+ Thread outputThread = new Thread(outputHandler, "QpidConnectionOutputThread-"+threadIndex);
+
outputThread.setDaemon(true);
outputThread.start();
conn.setFrameOutputHandler(out);
final ConnectionHandler handler = new ConnectionHandler(conn);
- final InputStream inputStream = s.getInputStream();
+ final InputStream inputStream = _socket.getInputStream();
Thread inputThread = new Thread(new Runnable()
{
@@ -153,21 +161,11 @@ class TCPTransportProvier implements TransportProvider
{
if(conn.closedForInput() && conn.closedForOutput())
{
- try
- {
- synchronized (outputStream)
- {
- s.close();
- }
- }
- catch (IOException e)
- {
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
- }
+ close();
}
}
}
- });
+ },"QpidConnectionInputThread-"+threadIndex);
inputThread.setDaemon(true);
inputThread.start();
@@ -178,6 +176,20 @@ class TCPTransportProvier implements TransportProvider
throw new ConnectionException(e);
}
}
+
+ @Override
+ public void close()
+ {
+ try
+ {
+ _socket.close();
+ }
+ catch (IOException e)
+ {
+ RAW_LOGGER.log(Level.WARNING, "Unexpected Error during TCPTransportProvider socket close", e);
+ }
+ }
+
private void doRead(final ConnectionEndpoint conn, final ConnectionHandler handler, final InputStream inputStream)
{
byte[] buf = new byte[2<<15];
diff --git a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/TransportProvider.java b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/TransportProvider.java
index 2c11d6b6ef..71628679f8 100644
--- a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/TransportProvider.java
+++ b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/TransportProvider.java
@@ -20,12 +20,10 @@
*/
package org.apache.qpid.amqp_1_0.client;
-import org.apache.qpid.amqp_1_0.framing.ConnectionHandler;
+import javax.net.ssl.SSLContext;
+
import org.apache.qpid.amqp_1_0.framing.ExceptionHandler;
import org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint;
-import org.apache.qpid.amqp_1_0.type.FrameBody;
-
-import javax.net.ssl.SSLContext;
public interface TransportProvider
{
@@ -34,4 +32,6 @@ public interface TransportProvider
int port,
SSLContext sslContext,
ExceptionHandler exceptionHandler) throws ConnectionException;
+
+ void close();
}
diff --git a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/ConnectionHandler.java b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/ConnectionHandler.java
index 54a4f22d48..b5ab25c3fb 100644
--- a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/ConnectionHandler.java
+++ b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/ConnectionHandler.java
@@ -20,6 +20,17 @@
*/
package org.apache.qpid.amqp_1_0.framing;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.Queue;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
import org.apache.qpid.amqp_1_0.codec.FrameWriter;
import org.apache.qpid.amqp_1_0.codec.ProtocolHandler;
import org.apache.qpid.amqp_1_0.codec.ProtocolHeaderHandler;
@@ -27,25 +38,13 @@ import org.apache.qpid.amqp_1_0.codec.ValueHandler;
import org.apache.qpid.amqp_1_0.codec.ValueWriter;
import org.apache.qpid.amqp_1_0.transport.BytesProcessor;
import org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint;
-
import org.apache.qpid.amqp_1_0.transport.FrameOutputHandler;
import org.apache.qpid.amqp_1_0.type.AmqpErrorException;
import org.apache.qpid.amqp_1_0.type.Binary;
-import org.apache.qpid.amqp_1_0.type.transport.Open;
import org.apache.qpid.amqp_1_0.type.Symbol;
import org.apache.qpid.amqp_1_0.type.UnsignedShort;
import org.apache.qpid.amqp_1_0.type.codec.AMQPDescribedTypeRegistry;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.LinkedList;
-import java.util.Queue;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.logging.Level;
-import java.util.logging.Logger;
+import org.apache.qpid.amqp_1_0.type.transport.Open;
public class ConnectionHandler
{
@@ -87,7 +86,7 @@ public class ConnectionHandler
// ----------------------------------------------------------------
- public static class FrameOutput<T> implements FrameOutputHandler<T>, FrameSource
+ public static class FrameOutput<T> implements FrameOutputHandler<T>
{
private static final ByteBuffer EMPTY_BYTEBUFFER = ByteBuffer.wrap(new byte[0]);
@@ -116,6 +115,39 @@ public class ConnectionHandler
_conn = conn;
}
+ public FrameSource asFrameSource()
+ {
+ return new FrameSource()
+ {
+ @Override
+ public AMQFrame getNextFrame(final boolean wait)
+ {
+ return FrameOutput.this.getNextFrame(wait);
+ }
+
+ @Override
+ public boolean closed()
+ {
+ return FrameOutput.this.closed();
+ }
+
+ @Override
+ public void close()
+ {
+ FrameOutput.this.immediateClose();
+ }
+ };
+ }
+
+ private void immediateClose()
+ {
+ synchronized (_conn.getLock())
+ {
+ _closed = true;
+ _conn.getLock().notifyAll();
+ }
+ }
+
public boolean canSend()
{
return _queue.remainingCapacity() != 0;
@@ -239,6 +271,8 @@ public class ConnectionHandler
{
AMQFrame<T> getNextFrame(boolean wait);
boolean closed();
+
+ void close();
}
@@ -246,6 +280,8 @@ public class ConnectionHandler
{
void getBytes(BytesProcessor processor, boolean wait);
boolean closed();
+
+ void close();
}
public static class FrameToBytesSourceAdapter implements BytesSource
@@ -320,6 +356,12 @@ public class ConnectionHandler
{
return _buffer.position() == 0 && _frameSource.closed();
}
+
+ @Override
+ public void close()
+ {
+ _frameSource.close();
+ }
}
@@ -344,6 +386,11 @@ public class ConnectionHandler
{
return !_buffer.hasRemaining();
}
+
+ @Override
+ public void close()
+ {
+ }
}
public static class SequentialBytesSource implements BytesSource
@@ -379,6 +426,19 @@ public class ConnectionHandler
{
return _sources.isEmpty();
}
+
+ @Override
+ public void close()
+ {
+ BytesSource src = _sources.peek();
+ while (src != null)
+ {
+ src.close();
+ _sources.poll();
+ src = _sources.peek();
+ }
+
+ }
}
@@ -420,6 +480,19 @@ public class ConnectionHandler
{
return _sources.isEmpty();
}
+
+ @Override
+ public void close()
+ {
+ FrameSource src = _sources.peek();
+ while (src != null)
+ {
+ src.close();
+ _sources.poll();
+ src = _sources.peek();
+ }
+
+ }
}
@@ -470,6 +543,7 @@ public class ConnectionHandler
catch (IOException e)
{
_closed = true;
+ _bytesSource.close();
_exceptionHandler.handleException(e);
}
}
diff --git a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java
index b48cdbe201..e47e4a3507 100644
--- a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java
+++ b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java
@@ -179,20 +179,27 @@ public class ConnectionEndpoint implements DescribedTypeConstructorRegistry.Sour
{
if (_requiresSASLClient)
{
- synchronized (getLock())
+ try
{
- while (!(_saslComplete || _closedForInput))
+ waitUntil(new Predicate()
{
- try
- {
- getLock().wait();
- }
- catch (InterruptedException e)
+
+ @Override
+ public boolean isSatisfied()
{
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ return _saslComplete || _closedForInput;
}
- }
+ });
+ }
+ catch (TimeoutException e)
+ {
+ throw new RuntimeException("Could not connect - authentication error");
+ }
+ catch (InterruptedException e)
+ {
+ throw new RuntimeException(e);
}
+
if (!_authenticated)
{
throw new RuntimeException("Could not connect - authentication error");
@@ -471,6 +478,10 @@ public class ConnectionEndpoint implements DescribedTypeConstructorRegistry.Sour
}
}
}
+ if(_connectionEventListener != null)
+ {
+ _connectionEventListener.closeReceived();
+ }
}
notifyAll();
}
@@ -801,9 +812,9 @@ public class ConnectionEndpoint implements DescribedTypeConstructorRegistry.Sour
return _describedTypeRegistry;
}
- public synchronized void setClosedForOutput(boolean b)
+ public synchronized void setClosedForOutput(boolean closed)
{
- _closedForOutput = true;
+ _closedForOutput = closed;
notifyAll();
}