summaryrefslogtreecommitdiff
path: root/qpid/java/client
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2015-02-10 13:37:00 +0000
committerRobert Godfrey <rgodfrey@apache.org>2015-02-10 13:37:00 +0000
commite2e6d542b8cde9e702d1c3b63376e9d8380ba1c7 (patch)
treef359ad52b3a352b099103df8df109113424ab8df /qpid/java/client
parent6bca3754c2b893ae0a27d3c11559f25c9b1e7ea4 (diff)
downloadqpid-python-e2e6d542b8cde9e702d1c3b63376e9d8380ba1c7.tar.gz
QPID-6374 : tidyup calls to connection task pool
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1658714 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/client')
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java40
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java71
2 files changed, 67 insertions, 44 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
index 5518435b94..4c596b88a0 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
@@ -32,6 +32,7 @@ import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
@@ -1353,16 +1354,23 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
if (exceptionListener != null)
{
performConnectionTask(new Runnable()
- {
- @Override
- public void run()
- {
- // deliver the exception if there is a listener
- exceptionListener.onException(je);
- }
- });
- }
- else
+ {
+ @Override
+ public void run()
+ {
+ // deliver the exception if there is a listener
+ try
+ {
+ exceptionListener.onException(je);
+ }
+ catch (RuntimeException e)
+ {
+ _logger.error("Exception occurred in ExceptionListener", e);
+ }
+ }
+ });
+ }
+ else
{
_logger.error("Throwable Received but no listener set: " + cause);
}
@@ -1478,7 +1486,17 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
public void performConnectionTask(Runnable task)
{
- _taskPool.execute(task);
+ try
+ {
+ _taskPool.execute(task);
+ }
+ catch (RejectedExecutionException e)
+ {
+ if(!(isClosed() || isClosing()))
+ {
+ throw e;
+ }
+ }
}
public AMQSession getSession(int channelId)
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
index bb0f0d9b13..143de271a1 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
@@ -772,42 +772,47 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe
private void returnBouncedMessage(final ReturnMessage msg)
{
- getAMQConnection().performConnectionTask(new Runnable()
+ try
{
- public void run()
+ // Bounced message is processed here, away from the mina thread
+ AbstractJMSMessage bouncedMessage =
+ getMessageFactoryRegistry().createMessage(0,
+ false,
+ msg.getExchange(),
+ msg.getRoutingKey(),
+ msg.getContentHeader(),
+ msg.getBodies(),
+ _queueDestinationCache,
+ _topicDestinationCache,
+ AMQDestination.UNKNOWN_TYPE);
+ AMQConstant errorCode = AMQConstant.getConstant(msg.getReplyCode());
+ AMQShortString reason = msg.getReplyText();
+ _logger.debug("Message returned with error code " + errorCode + " (" + reason + ")");
+
+ // @TODO should this be moved to an exception handler of sorts. Somewhere errors are converted to correct execeptions.
+ if (errorCode == AMQConstant.NO_CONSUMERS)
{
- try
- {
- // Bounced message is processed here, away from the mina thread
- AbstractJMSMessage bouncedMessage =
- getMessageFactoryRegistry().createMessage(0, false, msg.getExchange(),
- msg.getRoutingKey(), msg.getContentHeader(), msg.getBodies(), _queueDestinationCache,
- _topicDestinationCache, AMQDestination.UNKNOWN_TYPE);
- AMQConstant errorCode = AMQConstant.getConstant(msg.getReplyCode());
- AMQShortString reason = msg.getReplyText();
- _logger.debug("Message returned with error code " + errorCode + " (" + reason + ")");
-
- // @TODO should this be moved to an exception handler of sorts. Somewhere errors are converted to correct execeptions.
- if (errorCode == AMQConstant.NO_CONSUMERS)
- {
- getAMQConnection().exceptionReceived(new AMQNoConsumersException("Error: " + reason, bouncedMessage, null));
- } else if (errorCode == AMQConstant.NO_ROUTE)
- {
- getAMQConnection().exceptionReceived(new AMQNoRouteException("Error: " + reason, bouncedMessage, null));
- } else
- {
- getAMQConnection().exceptionReceived(
- new AMQUndeliveredException(errorCode, "Error: " + reason, bouncedMessage, null));
- }
-
- } catch (Exception e)
- {
- _logger.error(
- "Caught exception trying to raise undelivered message exception (dump follows) - ignoring...",
- e);
- }
+ getAMQConnection().exceptionReceived(new AMQNoConsumersException("Error: " + reason,
+ bouncedMessage,
+ null));
}
- });
+ else if (errorCode == AMQConstant.NO_ROUTE)
+ {
+ getAMQConnection().exceptionReceived(new AMQNoRouteException("Error: " + reason, bouncedMessage, null));
+ }
+ else
+ {
+ getAMQConnection().exceptionReceived(
+ new AMQUndeliveredException(errorCode, "Error: " + reason, bouncedMessage, null));
+ }
+
+ }
+ catch (Exception e)
+ {
+ _logger.error(
+ "Caught exception trying to raise undelivered message exception (dump follows) - ignoring...",
+ e);
+ }
}