summaryrefslogtreecommitdiff
path: root/qpid/java/client
diff options
context:
space:
mode:
authorKeith Wall <kwall@apache.org>2015-02-10 16:15:08 +0000
committerKeith Wall <kwall@apache.org>2015-02-10 16:15:08 +0000
commit085486ebe5ff21133b9caf1c31625ac6ea356568 (patch)
tree7acbe9ca99a345dca71f9f80cd3e29ea4e3710f0 /qpid/java/client
parent60c62c03ca404e98e4fbd1abf4a5ebf50763d604 (diff)
parente2e6d542b8cde9e702d1c3b63376e9d8380ba1c7 (diff)
downloadqpid-python-085486ebe5ff21133b9caf1c31625ac6ea356568.tar.gz
merge from trunk
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/QPID-6262-JavaBrokerNIO@1658748 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/client')
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java166
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java30
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java111
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java71
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java41
-rw-r--r--qpid/java/client/src/test/java/org/apache/qpid/client/MockAMQConnection.java10
6 files changed, 249 insertions, 180 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
index 70d91ad817..4c596b88a0 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
@@ -32,6 +32,7 @@ import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
@@ -64,7 +65,9 @@ import org.apache.qpid.AMQUnresolvedAddressException;
import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.client.failover.FailoverProtectedOperation;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
+import org.apache.qpid.client.security.CallbackHandlerRegistry;
import org.apache.qpid.client.state.AMQStateManager;
+import org.apache.qpid.common.QpidProperties;
import org.apache.qpid.configuration.ClientProperties;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQShortString;
@@ -192,6 +195,22 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
private boolean _compressMessages;
private int _messageCompressionThresholdSize;
+ static
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Qpid version : " + QpidProperties.getVersionString());
+ }
+
+ // The registering of any additional SASL mechanisms with the Java Security API requires
+ // SecurityManager permissions. In execution environments such as web containers,
+ // this may require adjustments to the Java security.policy.
+ CallbackHandlerRegistry registry = CallbackHandlerRegistry.getInstance();
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Loaded mechanisms " + registry.getMechanisms());
+ }
+ }
/**
* @param broker brokerdetails
* @param username username
@@ -847,13 +866,17 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
public void close(long timeout) throws JMSException
{
- close(new ArrayList<AMQSession>(_sessions.values()), timeout);
- }
+ boolean closed;
- public void close(List<AMQSession> sessions, long timeout) throws JMSException
- {
- if (!setClosed())
+ synchronized (_sessionCreationLock)
+ {
+ closed = setClosed();
+ }
+
+ if (!closed)
{
+ List<AMQSession> sessions = new ArrayList<>(_sessions.values());
+
setClosing(true);
try
{
@@ -868,54 +891,52 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
private void doClose(List<AMQSession> sessions, long timeout) throws JMSException
{
- synchronized (_sessionCreationLock)
+ if (!sessions.isEmpty())
{
- if (!sessions.isEmpty())
+ AMQSession session = sessions.remove(0);
+ synchronized (session.getMessageDeliveryLock())
{
- AMQSession session = sessions.remove(0);
- synchronized (session.getMessageDeliveryLock())
- {
- doClose(sessions, timeout);
- }
+ doClose(sessions, timeout);
}
- else
+ }
+ else
+ {
+ synchronized (getFailoverMutex())
{
- synchronized (getFailoverMutex())
+ try
{
try
{
- try
- {
- closeAllSessions(null, timeout);
- }
- finally
- {
- //This MUST occur after we have successfully closed all Channels/Sessions
- shutdownTaskPool(timeout);
- }
+ closeAllSessions(null, timeout);
}
- catch (JMSException e)
+ finally
{
- _logger.error("Error closing connection", e);
- JMSException jmse = new JMSException("Error closing connection: " + e);
- jmse.setLinkedException(e);
- jmse.initCause(e);
- throw jmse;
+ //This MUST occur after we have successfully closed all Channels/Sessions
+ shutdownTaskPool(timeout);
}
- finally
+ }
+ catch (JMSException e)
+ {
+ _logger.error("Error closing connection", e);
+ JMSException jmse = new JMSException("Error closing connection: " + e);
+ jmse.setLinkedException(e);
+ jmse.initCause(e);
+ throw jmse;
+ }
+ finally
+ {
+ try
+ {
+ _delegate.closeConnection(timeout);
+ }
+ catch (Exception e)
{
- try
- {
- _delegate.closeConnection(timeout);
- }
- catch (Exception e)
- {
- _logger.warn("Error closing underlying protocol connection", e);
- }
+ _logger.warn("Error closing underlying protocol connection", e);
}
}
}
}
+
}
private void shutdownTaskPool(final long timeout)
@@ -1290,28 +1311,29 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
try
{
- // get the failover mutex before trying to close
- synchronized (getFailoverMutex())
+ // decide if we are going to close the session
+ if (hardError(cause))
{
- // decide if we are going to close the session
- if (hardError(cause))
- {
- closer = (!setClosed()) || closer;
- {
- _logger.info("Closing AMQConnection due to :" + cause);
- }
- }
- else
+ closer = (!setClosed()) || closer;
{
- _logger.info("Not a hard-error connection not closing: " + cause);
+ _logger.info("Closing AMQConnection due to :" + cause);
}
+ }
+ else
+ {
+ _logger.info("Not a hard-error connection not closing: " + cause);
+ }
- // if we are closing the connection, close sessions first
- if (closer)
+
+ // if we are closing the connection, close sessions first
+ if (closer)
+ {
+ // get the failover mutex before trying to close
+ synchronized (getFailoverMutex())
{
try
{
- closeAllSessions(cause, -1); // FIXME: when doing this end up with RejectedExecutionException from executor.
+ closeAllSessions(cause, -1);
}
catch (JMSException e)
{
@@ -1328,16 +1350,32 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
private void deliverJMSExceptionToExceptionListenerOrLog(final JMSException je, final Throwable cause)
{
- // deliver the exception if there is a listener
- ExceptionListener exceptionListener = getExceptionListenerNoCheck();
+ final ExceptionListener exceptionListener = getExceptionListenerNoCheck();
if (exceptionListener != null)
{
- exceptionListener.onException(je);
- }
- else
+ performConnectionTask(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ // deliver the exception if there is a listener
+ try
+ {
+ exceptionListener.onException(je);
+ }
+ catch (RuntimeException e)
+ {
+ _logger.error("Exception occurred in ExceptionListener", e);
+ }
+ }
+ });
+ }
+ else
{
_logger.error("Throwable Received but no listener set: " + cause);
}
+
+
}
private boolean hardError(Throwable cause)
@@ -1448,7 +1486,17 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
public void performConnectionTask(Runnable task)
{
- _taskPool.execute(task);
+ try
+ {
+ _taskPool.execute(task);
+ }
+ catch (RejectedExecutionException e)
+ {
+ if(!(isClosed() || isClosing()))
+ {
+ throw e;
+ }
+ }
}
public AMQSession getSession(int channelId)
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
index 4e9164c3b0..fdeab7ae70 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
@@ -291,7 +291,7 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec
public void closed(Connection conn)
{
- ConnectionException exc = exception;
+ final ConnectionException exc = exception;
exception = null;
if (exc == null)
@@ -299,7 +299,7 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec
return;
}
- ConnectionClose close = exc.getClose();
+ final ConnectionClose close = exc.getClose();
if (close == null || close.getReplyCode() == ConnectionCloseCode.CONNECTION_FORCED)
{
_conn.getProtocolHandler().setFailoverLatch(new CountDownLatch(1));
@@ -332,23 +332,31 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec
_conn.setClosed();
- ExceptionListener listener = _conn.getExceptionListenerNoCheck();
+ final ExceptionListener listener = _conn.getExceptionListenerNoCheck();
if (listener == null)
{
_logger.error("connection exception: " + conn, exc);
}
else
{
- String code = null;
- if (close != null)
+ _conn.performConnectionTask(new Runnable()
{
- code = close.getReplyCode().toString();
- }
+ @Override
+ public void run()
+ {
+ String code = null;
+ if (close != null)
+ {
+ code = close.getReplyCode().toString();
+ }
+
+ JMSException ex = new JMSException(exc.getMessage(), code);
+ ex.setLinkedException(exc);
+ ex.initCause(exc);
+ listener.onException(ex);
+ }
+ });
- JMSException ex = new JMSException(exc.getMessage(), code);
- ex.setLinkedException(exc);
- ex.initCause(exc);
- listener.onException(ex);
}
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index 29460bb42d..12e9285af8 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -808,7 +808,16 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
*/
public void close(long timeout) throws JMSException
{
- close(timeout, true);
+ synchronized (_messageDeliveryLock)
+ {
+ // We must close down all producers and consumers in an orderly fashion. This is the only method
+ // that can be called from a different thread of control from the one controlling the session.
+ synchronized (getFailoverMutex())
+ {
+
+ close(timeout, true);
+ }
+ }
}
private void close(long timeout, boolean sendClose) throws JMSException
@@ -822,52 +831,44 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
if (!setClosed())
{
setClosing(true);
- synchronized (getFailoverMutex())
+ // we pass null since this is not an error case
+ closeProducersAndConsumers(null);
+
+ try
{
- // We must close down all producers and consumers in an orderly fashion. This is the only method
- // that can be called from a different thread of control from the one controlling the session.
- synchronized (_messageDeliveryLock)
+ // If the connection is open or we are in the process
+ // of closing the connection then send a cance
+ // no point otherwise as the connection will be gone
+ if (!_connection.isClosed() || _connection.isClosing())
{
- // we pass null since this is not an error case
- closeProducersAndConsumers(null);
-
- try
- {
- // If the connection is open or we are in the process
- // of closing the connection then send a cance
- // no point otherwise as the connection will be gone
- if (!_connection.isClosed() || _connection.isClosing())
- {
- if (sendClose)
- {
- sendClose(timeout);
- }
- }
- }
- catch (AMQException e)
- {
- JMSException jmse = new JMSException("Error closing session: " + e);
- jmse.setLinkedException(e);
- jmse.initCause(e);
- throw jmse;
- }
- // This is ignored because the channel is already marked as closed so the fail-over process will
- // not re-open it.
- catch (FailoverException e)
- {
- _logger.debug(
- "Got FailoverException during channel close, ignored as channel already marked as closed.");
- }
- catch (TransportException e)
- {
- throw toJMSException("Error closing session:" + e.getMessage(), e);
- }
- finally
+ if (sendClose)
{
- _connection.deregisterSession(_channelId);
+ sendClose(timeout);
}
}
}
+ catch (AMQException e)
+ {
+ JMSException jmse = new JMSException("Error closing session: " + e);
+ jmse.setLinkedException(e);
+ jmse.initCause(e);
+ throw jmse;
+ }
+ // This is ignored because the channel is already marked as closed so the fail-over process will
+ // not re-open it.
+ catch (FailoverException e)
+ {
+ _logger.debug(
+ "Got FailoverException during channel close, ignored as channel already marked as closed.");
+ }
+ catch (TransportException e)
+ {
+ throw toJMSException("Error closing session:" + e.getMessage(), e);
+ }
+ finally
+ {
+ _connection.deregisterSession(_channelId);
+ }
}
}
@@ -899,24 +900,22 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
if (!setClosed())
{
- synchronized (_messageDeliveryLock)
+ // An AMQException has an error code and message already and will be passed in when closure occurs as a
+ // result of a channel close request
+ AMQException amqe;
+ if (e instanceof AMQException)
{
- // An AMQException has an error code and message already and will be passed in when closure occurs as a
- // result of a channel close request
- AMQException amqe;
- if (e instanceof AMQException)
- {
- amqe = (AMQException) e;
- }
- else
- {
- amqe = new AMQException("Closing session forcibly", e);
- }
-
- _connection.deregisterSession(_channelId);
- closeProducersAndConsumers(amqe);
+ amqe = (AMQException) e;
+ }
+ else
+ {
+ amqe = new AMQException("Closing session forcibly", e);
}
+
+ _connection.deregisterSession(_channelId);
+ closeProducersAndConsumers(amqe);
}
+
}
protected void stopDispatcherThread()
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
index bb0f0d9b13..143de271a1 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
@@ -772,42 +772,47 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe
private void returnBouncedMessage(final ReturnMessage msg)
{
- getAMQConnection().performConnectionTask(new Runnable()
+ try
{
- public void run()
+ // Bounced message is processed here, away from the mina thread
+ AbstractJMSMessage bouncedMessage =
+ getMessageFactoryRegistry().createMessage(0,
+ false,
+ msg.getExchange(),
+ msg.getRoutingKey(),
+ msg.getContentHeader(),
+ msg.getBodies(),
+ _queueDestinationCache,
+ _topicDestinationCache,
+ AMQDestination.UNKNOWN_TYPE);
+ AMQConstant errorCode = AMQConstant.getConstant(msg.getReplyCode());
+ AMQShortString reason = msg.getReplyText();
+ _logger.debug("Message returned with error code " + errorCode + " (" + reason + ")");
+
+ // @TODO should this be moved to an exception handler of sorts. Somewhere errors are converted to correct execeptions.
+ if (errorCode == AMQConstant.NO_CONSUMERS)
{
- try
- {
- // Bounced message is processed here, away from the mina thread
- AbstractJMSMessage bouncedMessage =
- getMessageFactoryRegistry().createMessage(0, false, msg.getExchange(),
- msg.getRoutingKey(), msg.getContentHeader(), msg.getBodies(), _queueDestinationCache,
- _topicDestinationCache, AMQDestination.UNKNOWN_TYPE);
- AMQConstant errorCode = AMQConstant.getConstant(msg.getReplyCode());
- AMQShortString reason = msg.getReplyText();
- _logger.debug("Message returned with error code " + errorCode + " (" + reason + ")");
-
- // @TODO should this be moved to an exception handler of sorts. Somewhere errors are converted to correct execeptions.
- if (errorCode == AMQConstant.NO_CONSUMERS)
- {
- getAMQConnection().exceptionReceived(new AMQNoConsumersException("Error: " + reason, bouncedMessage, null));
- } else if (errorCode == AMQConstant.NO_ROUTE)
- {
- getAMQConnection().exceptionReceived(new AMQNoRouteException("Error: " + reason, bouncedMessage, null));
- } else
- {
- getAMQConnection().exceptionReceived(
- new AMQUndeliveredException(errorCode, "Error: " + reason, bouncedMessage, null));
- }
-
- } catch (Exception e)
- {
- _logger.error(
- "Caught exception trying to raise undelivered message exception (dump follows) - ignoring...",
- e);
- }
+ getAMQConnection().exceptionReceived(new AMQNoConsumersException("Error: " + reason,
+ bouncedMessage,
+ null));
}
- });
+ else if (errorCode == AMQConstant.NO_ROUTE)
+ {
+ getAMQConnection().exceptionReceived(new AMQNoRouteException("Error: " + reason, bouncedMessage, null));
+ }
+ else
+ {
+ getAMQConnection().exceptionReceived(
+ new AMQUndeliveredException(errorCode, "Error: " + reason, bouncedMessage, null));
+ }
+
+ }
+ catch (Exception e)
+ {
+ _logger.error(
+ "Caught exception trying to raise undelivered message exception (dump follows) - ignoring...",
+ e);
+ }
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
index b1e606b8e9..9cef1f8dce 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
@@ -598,35 +598,38 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa
if (sendClose)
{
+
// The Synchronized block only needs to protect network traffic.
- synchronized (_connection.getFailoverMutex())
+
+ try
{
- try
+ // If the session is open or we are in the process
+ // of closing the session then send a cance
+ // no point otherwise as the connection will be gone
+ if (!_session.isClosed() || _session.isClosing())
{
- // If the session is open or we are in the process
- // of closing the session then send a cance
- // no point otherwise as the connection will be gone
- if (!_session.isClosed() || _session.isClosing())
+ synchronized(_session.getMessageDeliveryLock())
{
- synchronized(_session.getMessageDeliveryLock())
+ synchronized (_connection.getFailoverMutex())
{
sendCancel();
}
}
}
- catch (AMQException e)
- {
- throw new JMSAMQException("Error closing consumer: " + e, e);
- }
- catch (FailoverException e)
- {
- throw new JMSAMQException("FailoverException interrupted basic cancel.", e);
- }
- catch (TransportException e)
- {
- throw _session.toJMSException("Exception while closing consumer: " + e.getMessage(), e);
- }
}
+ catch (AMQException e)
+ {
+ throw new JMSAMQException("Error closing consumer: " + e, e);
+ }
+ catch (FailoverException e)
+ {
+ throw new JMSAMQException("FailoverException interrupted basic cancel.", e);
+ }
+ catch (TransportException e)
+ {
+ throw _session.toJMSException("Exception while closing consumer: " + e.getMessage(), e);
+ }
+
}
else
{
diff --git a/qpid/java/client/src/test/java/org/apache/qpid/client/MockAMQConnection.java b/qpid/java/client/src/test/java/org/apache/qpid/client/MockAMQConnection.java
index 009598d8a4..ceb2a323ca 100644
--- a/qpid/java/client/src/test/java/org/apache/qpid/client/MockAMQConnection.java
+++ b/qpid/java/client/src/test/java/org/apache/qpid/client/MockAMQConnection.java
@@ -20,14 +20,14 @@
*/
package org.apache.qpid.client;
+import java.io.IOException;
+
import org.apache.qpid.AMQException;
import org.apache.qpid.client.state.AMQState;
import org.apache.qpid.framing.ProtocolVersion;
import org.apache.qpid.jms.BrokerDetails;
import org.apache.qpid.url.URLSyntaxException;
-import java.io.IOException;
-
public class MockAMQConnection extends AMQConnection
{
public MockAMQConnection(String broker, String username, String password, String clientName, String virtualHost)
@@ -60,4 +60,10 @@ public class MockAMQConnection extends AMQConnection
{
return super.getDelegate();
}
+
+ @Override
+ public void performConnectionTask(final Runnable task)
+ {
+ task.run();
+ }
}