summaryrefslogtreecommitdiff
path: root/java/client/src/main
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2007-02-19 13:48:17 +0000
committerMartin Ritchie <ritchiem@apache.org>2007-02-19 13:48:17 +0000
commitc24ca9c39287a58b3a5692e0744423fc88781906 (patch)
tree496c32352e26d69e76b0bd52226d3f855b4f2523 /java/client/src/main
parent65e0321d1b0145dc0d68a324e5a7cda661d5754b (diff)
downloadqpid-python-c24ca9c39287a58b3a5692e0744423fc88781906.tar.gz
QPID-379 Bounced Messages do not appear in connection exception listener.
The previous commit that started the Dispatcher was wrong and caused a lot of failures. This will address that problem by providing a thread pool on the client connection object to deliver bounced messages to the exception handler. Tidied up MessageListenerTests so all the asserts are in the given test. Renamed TestChannelCloseMethodHandlerNoCloseOk as surefire picks it up as a test case. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@509202 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client/src/main')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQConnection.java45
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession.java101
2 files changed, 101 insertions, 45 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
index adf2a4bda2..1fb1c51890 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
@@ -62,6 +62,9 @@ import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
public class AMQConnection extends Closeable implements Connection, QueueConnection, TopicConnection, Referenceable
{
@@ -144,6 +147,9 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
private AMQShortString _temporaryTopicExchangeName = ExchangeDefaults.TOPIC_EXCHANGE_NAME;
private AMQShortString _temporaryQueueExchangeName = ExchangeDefaults.DIRECT_EXCHANGE_NAME;
+ /** Thread Pool for executing connection level processes. Such as returning bounced messages. */
+ private final ExecutorService _taskPool = Executors.newCachedThreadPool();
+
/**
* @param broker brokerdetails
* @param username username
@@ -716,8 +722,31 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
{
try
{
+ long startCloseTime = System.currentTimeMillis();
+
+ _taskPool.shutdown();
closeAllSessions(null, timeout);
+
+ if (!_taskPool.isTerminated())
+ {
+ try
+ {
+ //adjust timeout
+ long taskPoolTimeout = adjustTimeout(timeout, startCloseTime);
+
+ _taskPool.awaitTermination(taskPoolTimeout , TimeUnit.MILLISECONDS);
+ }
+ catch (InterruptedException e)
+ {
+ _logger.info("Interrupted while shutting down connection thread pool.");
+ }
+ }
+
+ //adjust timeout
+ timeout = adjustTimeout(timeout, startCloseTime);
+
_protocolHandler.closeConnection(timeout);
+
}
catch (AMQException e)
{
@@ -727,6 +756,17 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
}
}
+ private long adjustTimeout(long timeout, long startTime)
+ {
+ long now = System.currentTimeMillis();
+ timeout -= now - startTime;
+ if (timeout < 0)
+ {
+ timeout = 0;
+ }
+ return timeout;
+ }
+
/**
* Marks all sessions and their children as closed without sending any protocol messages. Useful when you need to
* mark objects "visible" in userland as closed after failover or other significant event that impacts the
@@ -1147,4 +1187,9 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
{
_temporaryQueueExchangeName = temporaryQueueExchangeName;
}
+
+ public void performConnectionTask(Runnable task)
+ {
+ _taskPool.execute(task);
+ }
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index dc2ffc38c4..fe77acfabc 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -72,7 +72,6 @@ import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.client.protocol.BlockingMethodFrameListener;
import org.apache.qpid.client.util.FlowControllingBlockingQueue;
import org.apache.qpid.common.AMQPFilterTypes;
-import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQFrame;
import org.apache.qpid.framing.AMQMethodBody;
import org.apache.qpid.framing.AMQShortString;
@@ -192,7 +191,6 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
private boolean _hasMessageListeners;
-
/** Responsible for decoding a message fragment and passing it to the appropriate message consumer. */
private class Dispatcher extends Thread
@@ -277,42 +275,6 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
}
}
- else
- {
- try
- {
- // Bounced message is processed here, away from the mina thread
- AbstractJMSMessage bouncedMessage = _messageFactoryRegistry.createMessage(0,
- false,
- message.getBounceBody().exchange,
- message.getBounceBody().routingKey,
- message.getContentHeader(),
- message.getBodies());
-
- AMQConstant errorCode = AMQConstant.getConstant(message.getBounceBody().replyCode);
- AMQShortString reason = message.getBounceBody().replyText;
- _logger.debug("Message returned with error code " + errorCode + " (" + reason + ")");
-
- //@TODO should this be moved to an exception handler of sorts. Somewhere errors are converted to correct execeptions.
- if (errorCode == AMQConstant.NO_CONSUMERS)
- {
- _connection.exceptionReceived(new AMQNoConsumersException("Error: " + reason, bouncedMessage));
- }
- else if (errorCode == AMQConstant.NO_ROUTE)
- {
- _connection.exceptionReceived(new AMQNoRouteException("Error: " + reason, bouncedMessage));
- }
- else
- {
- _connection.exceptionReceived(new AMQUndeliveredException(errorCode, "Error: " + reason, bouncedMessage));
- }
-
- }
- catch (Exception e)
- {
- _logger.error("Caught exception trying to raise undelivered message exception (dump follows) - ignoring...", e);
- }
- }
}
public void close()
@@ -1384,7 +1346,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
if (topicName.indexOf('/') == -1)
{
- return new AMQTopic(getDefaultTopicExchangeName(),new AMQShortString(topicName));
+ return new AMQTopic(getDefaultTopicExchangeName(), new AMQShortString(topicName));
}
else
{
@@ -1474,8 +1436,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
}
// if the queue is bound to the exchange but NOT for this topic, then the JMS spec
// says we must trash the subscription.
- if (isQueueBound(dest.getExchangeName(),dest.getAMQQueueName()) &&
- !isQueueBound(dest.getExchangeName(),dest.getAMQQueueName(), topicName))
+ if (isQueueBound(dest.getExchangeName(), dest.getAMQQueueName()) &&
+ !isQueueBound(dest.getExchangeName(), dest.getAMQQueueName(), topicName))
{
deleteQueue(dest.getAMQQueueName());
}
@@ -1634,9 +1596,59 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
+ "] received in session with channel id " + _channelId);
}
- startDistpatcherIfNecessary();
+ if (message.getDeliverBody() == null)
+ {
+ // Return of the bounced message.
+ returnBouncedMessage(message);
+ }
+ else
+ {
+ _queue.add(message);
+ }
+ }
+
+ private void returnBouncedMessage(final UnprocessedMessage message)
+ {
+ _connection.performConnectionTask(
+ new Runnable()
+ {
+ public void run()
+ {
+ try
+ {
+ // Bounced message is processed here, away from the mina thread
+ AbstractJMSMessage bouncedMessage = _messageFactoryRegistry.createMessage(0,
+ false,
+ message.getBounceBody().exchange,
+ message.getBounceBody().routingKey,
+ message.getContentHeader(),
+ message.getBodies());
+
+ AMQConstant errorCode = AMQConstant.getConstant(message.getBounceBody().replyCode);
+ AMQShortString reason = message.getBounceBody().replyText;
+ _logger.debug("Message returned with error code " + errorCode + " (" + reason + ")");
+
+ //@TODO should this be moved to an exception handler of sorts. Somewhere errors are converted to correct execeptions.
+ if (errorCode == AMQConstant.NO_CONSUMERS)
+ {
+ _connection.exceptionReceived(new AMQNoConsumersException("Error: " + reason, bouncedMessage));
+ }
+ else if (errorCode == AMQConstant.NO_ROUTE)
+ {
+ _connection.exceptionReceived(new AMQNoRouteException("Error: " + reason, bouncedMessage));
+ }
+ else
+ {
+ _connection.exceptionReceived(new AMQUndeliveredException(errorCode, "Error: " + reason, bouncedMessage));
+ }
- _queue.add(message);
+ }
+ catch (Exception e)
+ {
+ _logger.error("Caught exception trying to raise undelivered message exception (dump follows) - ignoring...", e);
+ }
+ }
+ });
}
/**
@@ -1882,7 +1894,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
{
throw new javax.jms.InvalidDestinationException("Cannot create a subscription on a temporary topic created in another session");
}
- if(!(topic instanceof AMQTopic))
+ if (!(topic instanceof AMQTopic))
{
throw new javax.jms.InvalidDestinationException("Cannot create a subscription on topic created for another JMS Provider, class of topic provided is: " + topic.getClass().getName());
}
@@ -1917,7 +1929,6 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
}
-
public int getTicket()
{
return _ticket;