diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2015-02-10 13:37:00 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2015-02-10 13:37:00 +0000 |
| commit | e2e6d542b8cde9e702d1c3b63376e9d8380ba1c7 (patch) | |
| tree | f359ad52b3a352b099103df8df109113424ab8df /qpid/java/client | |
| parent | 6bca3754c2b893ae0a27d3c11559f25c9b1e7ea4 (diff) | |
| download | qpid-python-e2e6d542b8cde9e702d1c3b63376e9d8380ba1c7.tar.gz | |
QPID-6374 : tidyup calls to connection task pool
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1658714 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/client')
| -rw-r--r-- | qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java | 40 | ||||
| -rw-r--r-- | qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java | 71 |
2 files changed, 67 insertions, 44 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index 5518435b94..4c596b88a0 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -32,6 +32,7 @@ import java.util.LinkedList; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; @@ -1353,16 +1354,23 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect if (exceptionListener != null) { performConnectionTask(new Runnable() - { - @Override - public void run() - { - // deliver the exception if there is a listener - exceptionListener.onException(je); - } - }); - } - else + { + @Override + public void run() + { + // deliver the exception if there is a listener + try + { + exceptionListener.onException(je); + } + catch (RuntimeException e) + { + _logger.error("Exception occurred in ExceptionListener", e); + } + } + }); + } + else { _logger.error("Throwable Received but no listener set: " + cause); } @@ -1478,7 +1486,17 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect public void performConnectionTask(Runnable task) { - _taskPool.execute(task); + try + { + _taskPool.execute(task); + } + catch (RejectedExecutionException e) + { + if(!(isClosed() || isClosing())) + { + throw e; + } + } } public AMQSession getSession(int channelId) diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java index bb0f0d9b13..143de271a1 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java @@ -772,42 +772,47 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe private void returnBouncedMessage(final ReturnMessage msg) { - getAMQConnection().performConnectionTask(new Runnable() + try { - public void run() + // Bounced message is processed here, away from the mina thread + AbstractJMSMessage bouncedMessage = + getMessageFactoryRegistry().createMessage(0, + false, + msg.getExchange(), + msg.getRoutingKey(), + msg.getContentHeader(), + msg.getBodies(), + _queueDestinationCache, + _topicDestinationCache, + AMQDestination.UNKNOWN_TYPE); + AMQConstant errorCode = AMQConstant.getConstant(msg.getReplyCode()); + AMQShortString reason = msg.getReplyText(); + _logger.debug("Message returned with error code " + errorCode + " (" + reason + ")"); + + // @TODO should this be moved to an exception handler of sorts. Somewhere errors are converted to correct execeptions. + if (errorCode == AMQConstant.NO_CONSUMERS) { - try - { - // Bounced message is processed here, away from the mina thread - AbstractJMSMessage bouncedMessage = - getMessageFactoryRegistry().createMessage(0, false, msg.getExchange(), - msg.getRoutingKey(), msg.getContentHeader(), msg.getBodies(), _queueDestinationCache, - _topicDestinationCache, AMQDestination.UNKNOWN_TYPE); - AMQConstant errorCode = AMQConstant.getConstant(msg.getReplyCode()); - AMQShortString reason = msg.getReplyText(); - _logger.debug("Message returned with error code " + errorCode + " (" + reason + ")"); - - // @TODO should this be moved to an exception handler of sorts. Somewhere errors are converted to correct execeptions. - if (errorCode == AMQConstant.NO_CONSUMERS) - { - getAMQConnection().exceptionReceived(new AMQNoConsumersException("Error: " + reason, bouncedMessage, null)); - } else if (errorCode == AMQConstant.NO_ROUTE) - { - getAMQConnection().exceptionReceived(new AMQNoRouteException("Error: " + reason, bouncedMessage, null)); - } else - { - getAMQConnection().exceptionReceived( - new AMQUndeliveredException(errorCode, "Error: " + reason, bouncedMessage, null)); - } - - } catch (Exception e) - { - _logger.error( - "Caught exception trying to raise undelivered message exception (dump follows) - ignoring...", - e); - } + getAMQConnection().exceptionReceived(new AMQNoConsumersException("Error: " + reason, + bouncedMessage, + null)); } - }); + else if (errorCode == AMQConstant.NO_ROUTE) + { + getAMQConnection().exceptionReceived(new AMQNoRouteException("Error: " + reason, bouncedMessage, null)); + } + else + { + getAMQConnection().exceptionReceived( + new AMQUndeliveredException(errorCode, "Error: " + reason, bouncedMessage, null)); + } + + } + catch (Exception e) + { + _logger.error( + "Caught exception trying to raise undelivered message exception (dump follows) - ignoring...", + e); + } } |
