diff options
Diffstat (limited to 'qpid/java')
10 files changed, 255 insertions, 101 deletions
diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java index d7bb546d7a..1a72e129e7 100644 --- a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java +++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java @@ -239,7 +239,12 @@ public class MessageConsumerImpl implements MessageConsumer, QueueReceiver, Topi public MessageImpl receive() throws JMSException { checkClosed(); - return receiveImpl(-1L); + MessageImpl message = receiveImpl(-1L); + if(message == null) + { + throw new JMSException("Message could not be retrieved"); + } + return message; } public MessageImpl receive(final long timeout) throws JMSException diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java index bd67ff681a..0962e4aa37 100644 --- a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java +++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java @@ -891,9 +891,10 @@ public class SessionImpl implements Session, QueueSession, TopicSession { synchronized(getLock()) { - while(!_closed) + + while(!(_closed || getClientSession().getEndpoint().isEnded())) { - while(!_closed && (!_started || (_recoveredMessage == null && _messageConsumerList.isEmpty()))) + while(!(_closed || getClientSession().getEndpoint().isEnded()) && (!_started || (_recoveredMessage == null && _messageConsumerList.isEmpty()))) { try { @@ -904,7 +905,7 @@ public class SessionImpl implements Session, QueueSession, TopicSession return; } } - while(!_closed && (_started && (_recoveredMessage != null || !_messageConsumerList.isEmpty()))) + while(!(_closed || getClientSession().getEndpoint().isEnded()) && (_started && (_recoveredMessage != null || !_messageConsumerList.isEmpty()))) { Message msg; diff --git a/qpid/java/amqp-1-0-client-websocket/src/main/java/org/apache/qpid/amqp_1_0/client/websocket/WebSocketProvider.java b/qpid/java/amqp-1-0-client-websocket/src/main/java/org/apache/qpid/amqp_1_0/client/websocket/WebSocketProvider.java index 1805b593f1..cb1701b2fb 100644 --- a/qpid/java/amqp-1-0-client-websocket/src/main/java/org/apache/qpid/amqp_1_0/client/websocket/WebSocketProvider.java +++ b/qpid/java/amqp-1-0-client-websocket/src/main/java/org/apache/qpid/amqp_1_0/client/websocket/WebSocketProvider.java @@ -20,6 +20,18 @@ */ package org.apache.qpid.amqp_1_0.client.websocket; +import java.net.URI; +import java.nio.ByteBuffer; +import java.util.concurrent.TimeUnit; + +import javax.net.ssl.SSLContext; + +import org.eclipse.jetty.util.ssl.SslContextFactory; +import org.eclipse.jetty.util.thread.QueuedThreadPool; +import org.eclipse.jetty.websocket.WebSocket; +import org.eclipse.jetty.websocket.WebSocketClient; +import org.eclipse.jetty.websocket.WebSocketClientFactory; + import org.apache.qpid.amqp_1_0.client.ConnectionException; import org.apache.qpid.amqp_1_0.client.TransportProvider; import org.apache.qpid.amqp_1_0.codec.FrameWriter; @@ -29,16 +41,6 @@ import org.apache.qpid.amqp_1_0.framing.ExceptionHandler; import org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint; import org.apache.qpid.amqp_1_0.type.FrameBody; import org.apache.qpid.amqp_1_0.type.SaslFrameBody; -import org.eclipse.jetty.util.ssl.SslContextFactory; -import org.eclipse.jetty.util.thread.QueuedThreadPool; -import org.eclipse.jetty.websocket.WebSocket; -import org.eclipse.jetty.websocket.WebSocketClient; -import org.eclipse.jetty.websocket.WebSocketClientFactory; - -import javax.net.ssl.SSLContext; -import java.net.URI; -import java.nio.ByteBuffer; -import java.util.concurrent.TimeUnit; class WebSocketProvider implements TransportProvider { @@ -49,6 +51,7 @@ class WebSocketProvider implements TransportProvider private static QueuedThreadPool _threadPool; private final String _transport; private static WebSocketClientFactory _factory; + private WebSocket.Connection _connection; public WebSocketProvider(final String transport) { @@ -134,7 +137,7 @@ class WebSocketProvider implements TransportProvider (byte)1, (byte)0, (byte)0), - saslOut, + saslOut.asFrameSource(), new HeaderFrameSource((byte)'A', (byte)'M', (byte)'Q', @@ -143,7 +146,7 @@ class WebSocketProvider implements TransportProvider (byte)1, (byte)0, (byte)0), - out); + out.asFrameSource()); conn.setSaslFrameOutput(saslOut); } @@ -157,13 +160,13 @@ class WebSocketProvider implements TransportProvider (byte)1, (byte)0, (byte)0), - out); + out.asFrameSource()); } final ConnectionHandler handler = new ConnectionHandler(conn); conn.setFrameOutputHandler(out); final URI uri = new URI(_transport +"://"+ address+":"+ port +"/"); - WebSocket.Connection connection = client.open(uri, new WebSocket.OnBinaryMessage() + _connection = client.open(uri, new WebSocket.OnBinaryMessage() { public void onOpen(Connection connection) { @@ -192,6 +195,11 @@ class WebSocketProvider implements TransportProvider } + @Override + public void close() + { + _connection.close(); + } public static class HeaderFrameSource implements ConnectionHandler.FrameSource @@ -225,6 +233,12 @@ class WebSocketProvider implements TransportProvider return _closed; } + @Override + public void close() + { + _closed = true; + } + } diff --git a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Connection.java b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Connection.java index 6157ec53f6..9319d4ddff 100644 --- a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Connection.java +++ b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Connection.java @@ -25,8 +25,10 @@ import java.security.Principal; import java.util.ServiceLoader; import java.util.concurrent.TimeoutException; -import org.apache.qpid.amqp_1_0.framing.ExceptionHandler; +import javax.net.ssl.SSLContext; + import org.apache.qpid.amqp_1_0.framing.ConnectionHandler; +import org.apache.qpid.amqp_1_0.framing.ExceptionHandler; import org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint; import org.apache.qpid.amqp_1_0.transport.Container; import org.apache.qpid.amqp_1_0.transport.Predicate; @@ -37,8 +39,6 @@ import org.apache.qpid.amqp_1_0.type.transport.AmqpError; import org.apache.qpid.amqp_1_0.type.transport.ConnectionError; import org.apache.qpid.amqp_1_0.type.transport.Error; -import javax.net.ssl.SSLContext; - public class Connection implements ExceptionHandler { private static final int MAX_FRAME_SIZE = 65536; @@ -225,7 +225,7 @@ public class Connection implements ExceptionHandler (byte)1, (byte)0, (byte)0), - new ConnectionHandler.FrameToBytesSourceAdapter(saslOut,_conn.getDescribedTypeRegistry()), + new ConnectionHandler.FrameToBytesSourceAdapter(saslOut.asFrameSource(),_conn.getDescribedTypeRegistry()), new ConnectionHandler.HeaderBytesSource(_conn, (byte)'A', (byte)'M', (byte)'Q', @@ -234,7 +234,7 @@ public class Connection implements ExceptionHandler (byte)1, (byte)0, (byte)0), - new ConnectionHandler.FrameToBytesSourceAdapter(out,_conn.getDescribedTypeRegistry()) + new ConnectionHandler.FrameToBytesSourceAdapter(out.asFrameSource(),_conn.getDescribedTypeRegistry()) ); _conn.setSaslFrameOutput(saslOut); @@ -249,7 +249,7 @@ public class Connection implements ExceptionHandler (byte)1, (byte)0, (byte)0), - new ConnectionHandler.FrameToBytesSourceAdapter(out,_conn.getDescribedTypeRegistry()) + new ConnectionHandler.FrameToBytesSourceAdapter(out.asFrameSource(),_conn.getDescribedTypeRegistry()) ); } @@ -258,7 +258,14 @@ public class Connection implements ExceptionHandler transportProvider.connect(_conn,address,port, sslContext, this); - _conn.open(); + try + { + _conn.open(); + } + catch(RuntimeException e) + { + transportProvider.close(); + } } @@ -295,7 +302,14 @@ public class Connection implements ExceptionHandler { if(getEndpoint().isClosed()) { - throw new ConnectionClosedException(getEndpoint().getRemoteError()); + Error remoteError = getEndpoint().getRemoteError(); + if(remoteError == null) + { + remoteError = new Error(); + remoteError.setDescription("Connection closed for unknown reason"); + + } + throw new ConnectionClosedException(remoteError); } } @@ -377,7 +391,7 @@ public class Connection implements ExceptionHandler if(_connectionErrorTask != null) { Thread thread = new Thread(_connectionErrorTask); - thread.run(); + thread.start(); } } } diff --git a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java index ad2924c01e..a2a15779d2 100644 --- a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java +++ b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java @@ -20,26 +20,41 @@ */ package org.apache.qpid.amqp_1_0.client; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.TimeoutException; + import org.apache.qpid.amqp_1_0.messaging.SectionDecoder; import org.apache.qpid.amqp_1_0.transport.DeliveryStateHandler; import org.apache.qpid.amqp_1_0.transport.LinkEndpoint; import org.apache.qpid.amqp_1_0.transport.Predicate; import org.apache.qpid.amqp_1_0.transport.ReceivingLinkEndpoint; import org.apache.qpid.amqp_1_0.transport.ReceivingLinkListener; - -import org.apache.qpid.amqp_1_0.type.*; +import org.apache.qpid.amqp_1_0.type.AmqpErrorException; +import org.apache.qpid.amqp_1_0.type.Binary; import org.apache.qpid.amqp_1_0.type.DeliveryState; -import org.apache.qpid.amqp_1_0.type.messaging.*; +import org.apache.qpid.amqp_1_0.type.Outcome; +import org.apache.qpid.amqp_1_0.type.Section; +import org.apache.qpid.amqp_1_0.type.UnsignedInteger; +import org.apache.qpid.amqp_1_0.type.messaging.Accepted; +import org.apache.qpid.amqp_1_0.type.messaging.Modified; +import org.apache.qpid.amqp_1_0.type.messaging.Released; import org.apache.qpid.amqp_1_0.type.messaging.Source; import org.apache.qpid.amqp_1_0.type.messaging.Target; +import org.apache.qpid.amqp_1_0.type.messaging.TerminusDurability; +import org.apache.qpid.amqp_1_0.type.messaging.TerminusExpiryPolicy; import org.apache.qpid.amqp_1_0.type.transaction.TransactionalState; -import org.apache.qpid.amqp_1_0.type.transport.*; +import org.apache.qpid.amqp_1_0.type.transport.AmqpError; +import org.apache.qpid.amqp_1_0.type.transport.Detach; import org.apache.qpid.amqp_1_0.type.transport.Error; - -import java.nio.ByteBuffer; -import java.util.*; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.TimeoutException; +import org.apache.qpid.amqp_1_0.type.transport.ReceiverSettleMode; +import org.apache.qpid.amqp_1_0.type.transport.SenderSettleMode; +import org.apache.qpid.amqp_1_0.type.transport.Transfer; public class Receiver implements DeliveryStateHandler { @@ -193,7 +208,8 @@ public class Receiver implements DeliveryStateHandler { if(_remoteErrorTask != null) { - _remoteErrorTask.run(); + Thread thread = new Thread(_remoteErrorTask); + thread.start(); } } diff --git a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Sender.java b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Sender.java index 1addad2235..adeab4ab5d 100644 --- a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Sender.java +++ b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Sender.java @@ -20,6 +20,13 @@ */ package org.apache.qpid.amqp_1_0.client; +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeoutException; + import org.apache.qpid.amqp_1_0.codec.DescribedTypeConstructor; import org.apache.qpid.amqp_1_0.messaging.SectionEncoder; import org.apache.qpid.amqp_1_0.transport.DeliveryStateHandler; @@ -27,22 +34,21 @@ import org.apache.qpid.amqp_1_0.transport.LinkEndpoint; import org.apache.qpid.amqp_1_0.transport.Predicate; import org.apache.qpid.amqp_1_0.transport.SendingLinkEndpoint; import org.apache.qpid.amqp_1_0.transport.SendingLinkListener; -import org.apache.qpid.amqp_1_0.type.*; +import org.apache.qpid.amqp_1_0.type.Binary; +import org.apache.qpid.amqp_1_0.type.DeliveryState; +import org.apache.qpid.amqp_1_0.type.Outcome; +import org.apache.qpid.amqp_1_0.type.Section; import org.apache.qpid.amqp_1_0.type.Source; import org.apache.qpid.amqp_1_0.type.Target; import org.apache.qpid.amqp_1_0.type.codec.AMQPDescribedTypeRegistry; -import org.apache.qpid.amqp_1_0.type.messaging.*; +import org.apache.qpid.amqp_1_0.type.messaging.Accepted; +import org.apache.qpid.amqp_1_0.type.messaging.TerminusDurability; +import org.apache.qpid.amqp_1_0.type.messaging.TerminusExpiryPolicy; import org.apache.qpid.amqp_1_0.type.transaction.TransactionalState; -import org.apache.qpid.amqp_1_0.type.transport.*; - -import java.nio.ByteBuffer; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.TimeoutException; - +import org.apache.qpid.amqp_1_0.type.transport.Detach; import org.apache.qpid.amqp_1_0.type.transport.Error; +import org.apache.qpid.amqp_1_0.type.transport.SenderSettleMode; +import org.apache.qpid.amqp_1_0.type.transport.Transfer; public class Sender implements DeliveryStateHandler { @@ -488,7 +494,8 @@ public class Sender implements DeliveryStateHandler { if(_remoteErrorTask != null) { - _remoteErrorTask.run(); + Thread thread = new Thread(_remoteErrorTask); + thread.start(); } } diff --git a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/TCPTransportProvier.java b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/TCPTransportProvier.java index ee515c33ef..da084bdc7b 100644 --- a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/TCPTransportProvier.java +++ b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/TCPTransportProvier.java @@ -26,6 +26,9 @@ import java.io.OutputStream; import java.net.Socket; import java.net.SocketTimeoutException; import java.nio.ByteBuffer; +import java.util.concurrent.atomic.AtomicLong; +import java.util.logging.Level; +import java.util.logging.Logger; import javax.net.ssl.SSLContext; import javax.net.ssl.SSLSocket; @@ -39,6 +42,9 @@ import org.apache.qpid.amqp_1_0.type.SaslFrameBody; class TCPTransportProvier implements TransportProvider { + private static final Logger RAW_LOGGER = Logger.getLogger("RAW"); + + private Socket _socket; private final String _transport; // Defines read socket timeout in milliseconds. A value of 0 means that the socket @@ -49,6 +55,7 @@ class TCPTransportProvier implements TransportProvider // the event of a SocketTimeoutException. A value of -1L will disable idle read timeout checking. // Default value is set to -1L, which means disable idle read checks. private long _readIdleTimeout = Long.getLong("qpid.connection_read_idle_timeout", -1L); + private final AtomicLong _threadNameIndex = new AtomicLong(); public TCPTransportProvier(final String transport) { @@ -64,7 +71,6 @@ class TCPTransportProvier implements TransportProvider { try { - final Socket s; if(sslContext != null) { final SSLSocketFactory socketFactory = sslContext.getSocketFactory(); @@ -72,16 +78,16 @@ class TCPTransportProvier implements TransportProvider SSLSocket sslSocket = (SSLSocket) socketFactory.createSocket(address, port); conn.setExternalPrincipal(sslSocket.getSession().getLocalPrincipal()); - s=sslSocket; + _socket=sslSocket; } else { - s = new Socket(address, port); + _socket = new Socket(address, port); } // set socket read timeout - s.setSoTimeout(_readTimeout); + _socket.setSoTimeout(_readTimeout); - conn.setRemoteAddress(s.getRemoteSocketAddress()); + conn.setRemoteAddress(_socket.getRemoteSocketAddress()); ConnectionHandler.FrameOutput<FrameBody> out = new ConnectionHandler.FrameOutput<FrameBody>(conn); @@ -99,7 +105,7 @@ class TCPTransportProvier implements TransportProvider (byte)1, (byte)0, (byte)0), - new ConnectionHandler.FrameToBytesSourceAdapter(saslOut,conn.getDescribedTypeRegistry()), + new ConnectionHandler.FrameToBytesSourceAdapter(saslOut.asFrameSource(),conn.getDescribedTypeRegistry()), new ConnectionHandler.HeaderBytesSource(conn, (byte)'A', (byte)'M', (byte)'Q', @@ -108,7 +114,7 @@ class TCPTransportProvier implements TransportProvider (byte)1, (byte)0, (byte)0), - new ConnectionHandler.FrameToBytesSourceAdapter(out,conn.getDescribedTypeRegistry()) + new ConnectionHandler.FrameToBytesSourceAdapter(out.asFrameSource(),conn.getDescribedTypeRegistry()) ); conn.setSaslFrameOutput(saslOut); @@ -123,22 +129,24 @@ class TCPTransportProvier implements TransportProvider (byte)1, (byte)0, (byte)0), - new ConnectionHandler.FrameToBytesSourceAdapter(out,conn.getDescribedTypeRegistry()) + new ConnectionHandler.FrameToBytesSourceAdapter(out.asFrameSource(),conn.getDescribedTypeRegistry()) ); } - final OutputStream outputStream = s.getOutputStream(); + final OutputStream outputStream = _socket.getOutputStream(); ConnectionHandler.BytesOutputHandler outputHandler = new ConnectionHandler.BytesOutputHandler(outputStream, src, conn, exceptionHandler); - Thread outputThread = new Thread(outputHandler); + long threadIndex = _threadNameIndex.getAndIncrement(); + Thread outputThread = new Thread(outputHandler, "QpidConnectionOutputThread-"+threadIndex); + outputThread.setDaemon(true); outputThread.start(); conn.setFrameOutputHandler(out); final ConnectionHandler handler = new ConnectionHandler(conn); - final InputStream inputStream = s.getInputStream(); + final InputStream inputStream = _socket.getInputStream(); Thread inputThread = new Thread(new Runnable() { @@ -153,21 +161,11 @@ class TCPTransportProvier implements TransportProvider { if(conn.closedForInput() && conn.closedForOutput()) { - try - { - synchronized (outputStream) - { - s.close(); - } - } - catch (IOException e) - { - e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. - } + close(); } } } - }); + },"QpidConnectionInputThread-"+threadIndex); inputThread.setDaemon(true); inputThread.start(); @@ -178,6 +176,20 @@ class TCPTransportProvier implements TransportProvider throw new ConnectionException(e); } } + + @Override + public void close() + { + try + { + _socket.close(); + } + catch (IOException e) + { + RAW_LOGGER.log(Level.WARNING, "Unexpected Error during TCPTransportProvider socket close", e); + } + } + private void doRead(final ConnectionEndpoint conn, final ConnectionHandler handler, final InputStream inputStream) { byte[] buf = new byte[2<<15]; diff --git a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/TransportProvider.java b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/TransportProvider.java index 2c11d6b6ef..71628679f8 100644 --- a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/TransportProvider.java +++ b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/TransportProvider.java @@ -20,12 +20,10 @@ */ package org.apache.qpid.amqp_1_0.client; -import org.apache.qpid.amqp_1_0.framing.ConnectionHandler; +import javax.net.ssl.SSLContext; + import org.apache.qpid.amqp_1_0.framing.ExceptionHandler; import org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint; -import org.apache.qpid.amqp_1_0.type.FrameBody; - -import javax.net.ssl.SSLContext; public interface TransportProvider { @@ -34,4 +32,6 @@ public interface TransportProvider int port, SSLContext sslContext, ExceptionHandler exceptionHandler) throws ConnectionException; + + void close(); } diff --git a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/ConnectionHandler.java b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/ConnectionHandler.java index 54a4f22d48..b5ab25c3fb 100644 --- a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/ConnectionHandler.java +++ b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/ConnectionHandler.java @@ -20,6 +20,17 @@ */ package org.apache.qpid.amqp_1_0.framing; +import java.io.IOException; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.LinkedList; +import java.util.Queue; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.logging.Level; +import java.util.logging.Logger; + import org.apache.qpid.amqp_1_0.codec.FrameWriter; import org.apache.qpid.amqp_1_0.codec.ProtocolHandler; import org.apache.qpid.amqp_1_0.codec.ProtocolHeaderHandler; @@ -27,25 +38,13 @@ import org.apache.qpid.amqp_1_0.codec.ValueHandler; import org.apache.qpid.amqp_1_0.codec.ValueWriter; import org.apache.qpid.amqp_1_0.transport.BytesProcessor; import org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint; - import org.apache.qpid.amqp_1_0.transport.FrameOutputHandler; import org.apache.qpid.amqp_1_0.type.AmqpErrorException; import org.apache.qpid.amqp_1_0.type.Binary; -import org.apache.qpid.amqp_1_0.type.transport.Open; import org.apache.qpid.amqp_1_0.type.Symbol; import org.apache.qpid.amqp_1_0.type.UnsignedShort; import org.apache.qpid.amqp_1_0.type.codec.AMQPDescribedTypeRegistry; - -import java.io.IOException; -import java.io.OutputStream; -import java.nio.ByteBuffer; -import java.util.Arrays; -import java.util.LinkedList; -import java.util.Queue; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; -import java.util.logging.Level; -import java.util.logging.Logger; +import org.apache.qpid.amqp_1_0.type.transport.Open; public class ConnectionHandler { @@ -87,7 +86,7 @@ public class ConnectionHandler // ---------------------------------------------------------------- - public static class FrameOutput<T> implements FrameOutputHandler<T>, FrameSource + public static class FrameOutput<T> implements FrameOutputHandler<T> { private static final ByteBuffer EMPTY_BYTEBUFFER = ByteBuffer.wrap(new byte[0]); @@ -116,6 +115,39 @@ public class ConnectionHandler _conn = conn; } + public FrameSource asFrameSource() + { + return new FrameSource() + { + @Override + public AMQFrame getNextFrame(final boolean wait) + { + return FrameOutput.this.getNextFrame(wait); + } + + @Override + public boolean closed() + { + return FrameOutput.this.closed(); + } + + @Override + public void close() + { + FrameOutput.this.immediateClose(); + } + }; + } + + private void immediateClose() + { + synchronized (_conn.getLock()) + { + _closed = true; + _conn.getLock().notifyAll(); + } + } + public boolean canSend() { return _queue.remainingCapacity() != 0; @@ -239,6 +271,8 @@ public class ConnectionHandler { AMQFrame<T> getNextFrame(boolean wait); boolean closed(); + + void close(); } @@ -246,6 +280,8 @@ public class ConnectionHandler { void getBytes(BytesProcessor processor, boolean wait); boolean closed(); + + void close(); } public static class FrameToBytesSourceAdapter implements BytesSource @@ -320,6 +356,12 @@ public class ConnectionHandler { return _buffer.position() == 0 && _frameSource.closed(); } + + @Override + public void close() + { + _frameSource.close(); + } } @@ -344,6 +386,11 @@ public class ConnectionHandler { return !_buffer.hasRemaining(); } + + @Override + public void close() + { + } } public static class SequentialBytesSource implements BytesSource @@ -379,6 +426,19 @@ public class ConnectionHandler { return _sources.isEmpty(); } + + @Override + public void close() + { + BytesSource src = _sources.peek(); + while (src != null) + { + src.close(); + _sources.poll(); + src = _sources.peek(); + } + + } } @@ -420,6 +480,19 @@ public class ConnectionHandler { return _sources.isEmpty(); } + + @Override + public void close() + { + FrameSource src = _sources.peek(); + while (src != null) + { + src.close(); + _sources.poll(); + src = _sources.peek(); + } + + } } @@ -470,6 +543,7 @@ public class ConnectionHandler catch (IOException e) { _closed = true; + _bytesSource.close(); _exceptionHandler.handleException(e); } } diff --git a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java index b48cdbe201..e47e4a3507 100644 --- a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java +++ b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java @@ -179,20 +179,27 @@ public class ConnectionEndpoint implements DescribedTypeConstructorRegistry.Sour { if (_requiresSASLClient) { - synchronized (getLock()) + try { - while (!(_saslComplete || _closedForInput)) + waitUntil(new Predicate() { - try - { - getLock().wait(); - } - catch (InterruptedException e) + + @Override + public boolean isSatisfied() { - e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + return _saslComplete || _closedForInput; } - } + }); + } + catch (TimeoutException e) + { + throw new RuntimeException("Could not connect - authentication error"); + } + catch (InterruptedException e) + { + throw new RuntimeException(e); } + if (!_authenticated) { throw new RuntimeException("Could not connect - authentication error"); @@ -471,6 +478,10 @@ public class ConnectionEndpoint implements DescribedTypeConstructorRegistry.Sour } } } + if(_connectionEventListener != null) + { + _connectionEventListener.closeReceived(); + } } notifyAll(); } @@ -801,9 +812,9 @@ public class ConnectionEndpoint implements DescribedTypeConstructorRegistry.Sour return _describedTypeRegistry; } - public synchronized void setClosedForOutput(boolean b) + public synchronized void setClosedForOutput(boolean closed) { - _closedForOutput = true; + _closedForOutput = closed; notifyAll(); } |
