summaryrefslogtreecommitdiff
path: root/java/client/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'java/client/src/main')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQConnection.java45
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession.java101
2 files changed, 101 insertions, 45 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
index adf2a4bda2..1fb1c51890 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
@@ -62,6 +62,9 @@ import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
public class AMQConnection extends Closeable implements Connection, QueueConnection, TopicConnection, Referenceable
{
@@ -144,6 +147,9 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
private AMQShortString _temporaryTopicExchangeName = ExchangeDefaults.TOPIC_EXCHANGE_NAME;
private AMQShortString _temporaryQueueExchangeName = ExchangeDefaults.DIRECT_EXCHANGE_NAME;
+ /** Thread Pool for executing connection level processes. Such as returning bounced messages. */
+ private final ExecutorService _taskPool = Executors.newCachedThreadPool();
+
/**
* @param broker brokerdetails
* @param username username
@@ -716,8 +722,31 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
{
try
{
+ long startCloseTime = System.currentTimeMillis();
+
+ _taskPool.shutdown();
closeAllSessions(null, timeout);
+
+ if (!_taskPool.isTerminated())
+ {
+ try
+ {
+ //adjust timeout
+ long taskPoolTimeout = adjustTimeout(timeout, startCloseTime);
+
+ _taskPool.awaitTermination(taskPoolTimeout , TimeUnit.MILLISECONDS);
+ }
+ catch (InterruptedException e)
+ {
+ _logger.info("Interrupted while shutting down connection thread pool.");
+ }
+ }
+
+ //adjust timeout
+ timeout = adjustTimeout(timeout, startCloseTime);
+
_protocolHandler.closeConnection(timeout);
+
}
catch (AMQException e)
{
@@ -727,6 +756,17 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
}
}
+ private long adjustTimeout(long timeout, long startTime)
+ {
+ long now = System.currentTimeMillis();
+ timeout -= now - startTime;
+ if (timeout < 0)
+ {
+ timeout = 0;
+ }
+ return timeout;
+ }
+
/**
* Marks all sessions and their children as closed without sending any protocol messages. Useful when you need to
* mark objects "visible" in userland as closed after failover or other significant event that impacts the
@@ -1147,4 +1187,9 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
{
_temporaryQueueExchangeName = temporaryQueueExchangeName;
}
+
+ public void performConnectionTask(Runnable task)
+ {
+ _taskPool.execute(task);
+ }
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index dc2ffc38c4..fe77acfabc 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -72,7 +72,6 @@ import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.client.protocol.BlockingMethodFrameListener;
import org.apache.qpid.client.util.FlowControllingBlockingQueue;
import org.apache.qpid.common.AMQPFilterTypes;
-import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQFrame;
import org.apache.qpid.framing.AMQMethodBody;
import org.apache.qpid.framing.AMQShortString;
@@ -192,7 +191,6 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
private boolean _hasMessageListeners;
-
/** Responsible for decoding a message fragment and passing it to the appropriate message consumer. */
private class Dispatcher extends Thread
@@ -277,42 +275,6 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
}
}
- else
- {
- try
- {
- // Bounced message is processed here, away from the mina thread
- AbstractJMSMessage bouncedMessage = _messageFactoryRegistry.createMessage(0,
- false,
- message.getBounceBody().exchange,
- message.getBounceBody().routingKey,
- message.getContentHeader(),
- message.getBodies());
-
- AMQConstant errorCode = AMQConstant.getConstant(message.getBounceBody().replyCode);
- AMQShortString reason = message.getBounceBody().replyText;
- _logger.debug("Message returned with error code " + errorCode + " (" + reason + ")");
-
- //@TODO should this be moved to an exception handler of sorts. Somewhere errors are converted to correct execeptions.
- if (errorCode == AMQConstant.NO_CONSUMERS)
- {
- _connection.exceptionReceived(new AMQNoConsumersException("Error: " + reason, bouncedMessage));
- }
- else if (errorCode == AMQConstant.NO_ROUTE)
- {
- _connection.exceptionReceived(new AMQNoRouteException("Error: " + reason, bouncedMessage));
- }
- else
- {
- _connection.exceptionReceived(new AMQUndeliveredException(errorCode, "Error: " + reason, bouncedMessage));
- }
-
- }
- catch (Exception e)
- {
- _logger.error("Caught exception trying to raise undelivered message exception (dump follows) - ignoring...", e);
- }
- }
}
public void close()
@@ -1384,7 +1346,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
if (topicName.indexOf('/') == -1)
{
- return new AMQTopic(getDefaultTopicExchangeName(),new AMQShortString(topicName));
+ return new AMQTopic(getDefaultTopicExchangeName(), new AMQShortString(topicName));
}
else
{
@@ -1474,8 +1436,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
}
// if the queue is bound to the exchange but NOT for this topic, then the JMS spec
// says we must trash the subscription.
- if (isQueueBound(dest.getExchangeName(),dest.getAMQQueueName()) &&
- !isQueueBound(dest.getExchangeName(),dest.getAMQQueueName(), topicName))
+ if (isQueueBound(dest.getExchangeName(), dest.getAMQQueueName()) &&
+ !isQueueBound(dest.getExchangeName(), dest.getAMQQueueName(), topicName))
{
deleteQueue(dest.getAMQQueueName());
}
@@ -1634,9 +1596,59 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
+ "] received in session with channel id " + _channelId);
}
- startDistpatcherIfNecessary();
+ if (message.getDeliverBody() == null)
+ {
+ // Return of the bounced message.
+ returnBouncedMessage(message);
+ }
+ else
+ {
+ _queue.add(message);
+ }
+ }
+
+ private void returnBouncedMessage(final UnprocessedMessage message)
+ {
+ _connection.performConnectionTask(
+ new Runnable()
+ {
+ public void run()
+ {
+ try
+ {
+ // Bounced message is processed here, away from the mina thread
+ AbstractJMSMessage bouncedMessage = _messageFactoryRegistry.createMessage(0,
+ false,
+ message.getBounceBody().exchange,
+ message.getBounceBody().routingKey,
+ message.getContentHeader(),
+ message.getBodies());
+
+ AMQConstant errorCode = AMQConstant.getConstant(message.getBounceBody().replyCode);
+ AMQShortString reason = message.getBounceBody().replyText;
+ _logger.debug("Message returned with error code " + errorCode + " (" + reason + ")");
+
+ //@TODO should this be moved to an exception handler of sorts. Somewhere errors are converted to correct execeptions.
+ if (errorCode == AMQConstant.NO_CONSUMERS)
+ {
+ _connection.exceptionReceived(new AMQNoConsumersException("Error: " + reason, bouncedMessage));
+ }
+ else if (errorCode == AMQConstant.NO_ROUTE)
+ {
+ _connection.exceptionReceived(new AMQNoRouteException("Error: " + reason, bouncedMessage));
+ }
+ else
+ {
+ _connection.exceptionReceived(new AMQUndeliveredException(errorCode, "Error: " + reason, bouncedMessage));
+ }
- _queue.add(message);
+ }
+ catch (Exception e)
+ {
+ _logger.error("Caught exception trying to raise undelivered message exception (dump follows) - ignoring...", e);
+ }
+ }
+ });
}
/**
@@ -1882,7 +1894,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
{
throw new javax.jms.InvalidDestinationException("Cannot create a subscription on a temporary topic created in another session");
}
- if(!(topic instanceof AMQTopic))
+ if (!(topic instanceof AMQTopic))
{
throw new javax.jms.InvalidDestinationException("Cannot create a subscription on topic created for another JMS Provider, class of topic provided is: " + topic.getClass().getName());
}
@@ -1917,7 +1929,6 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
}
-
public int getTicket()
{
return _ticket;