diff options
Diffstat (limited to 'java/client/src/main')
| -rw-r--r-- | java/client/src/main/java/org/apache/qpid/client/AMQConnection.java | 45 | ||||
| -rw-r--r-- | java/client/src/main/java/org/apache/qpid/client/AMQSession.java | 101 |
2 files changed, 101 insertions, 45 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index adf2a4bda2..1fb1c51890 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -62,6 +62,9 @@ import java.util.LinkedHashMap; import java.util.LinkedList; import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.Executors; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; public class AMQConnection extends Closeable implements Connection, QueueConnection, TopicConnection, Referenceable { @@ -144,6 +147,9 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect private AMQShortString _temporaryTopicExchangeName = ExchangeDefaults.TOPIC_EXCHANGE_NAME; private AMQShortString _temporaryQueueExchangeName = ExchangeDefaults.DIRECT_EXCHANGE_NAME; + /** Thread Pool for executing connection level processes. Such as returning bounced messages. */ + private final ExecutorService _taskPool = Executors.newCachedThreadPool(); + /** * @param broker brokerdetails * @param username username @@ -716,8 +722,31 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { try { + long startCloseTime = System.currentTimeMillis(); + + _taskPool.shutdown(); closeAllSessions(null, timeout); + + if (!_taskPool.isTerminated()) + { + try + { + //adjust timeout + long taskPoolTimeout = adjustTimeout(timeout, startCloseTime); + + _taskPool.awaitTermination(taskPoolTimeout , TimeUnit.MILLISECONDS); + } + catch (InterruptedException e) + { + _logger.info("Interrupted while shutting down connection thread pool."); + } + } + + //adjust timeout + timeout = adjustTimeout(timeout, startCloseTime); + _protocolHandler.closeConnection(timeout); + } catch (AMQException e) { @@ -727,6 +756,17 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect } } + private long adjustTimeout(long timeout, long startTime) + { + long now = System.currentTimeMillis(); + timeout -= now - startTime; + if (timeout < 0) + { + timeout = 0; + } + return timeout; + } + /** * Marks all sessions and their children as closed without sending any protocol messages. Useful when you need to * mark objects "visible" in userland as closed after failover or other significant event that impacts the @@ -1147,4 +1187,9 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { _temporaryQueueExchangeName = temporaryQueueExchangeName; } + + public void performConnectionTask(Runnable task) + { + _taskPool.execute(task); + } } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index dc2ffc38c4..fe77acfabc 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -72,7 +72,6 @@ import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.client.protocol.BlockingMethodFrameListener; import org.apache.qpid.client.util.FlowControllingBlockingQueue; import org.apache.qpid.common.AMQPFilterTypes; -import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.AMQFrame; import org.apache.qpid.framing.AMQMethodBody; import org.apache.qpid.framing.AMQShortString; @@ -192,7 +191,6 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi private boolean _hasMessageListeners; - /** Responsible for decoding a message fragment and passing it to the appropriate message consumer. */ private class Dispatcher extends Thread @@ -277,42 +275,6 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } } - else - { - try - { - // Bounced message is processed here, away from the mina thread - AbstractJMSMessage bouncedMessage = _messageFactoryRegistry.createMessage(0, - false, - message.getBounceBody().exchange, - message.getBounceBody().routingKey, - message.getContentHeader(), - message.getBodies()); - - AMQConstant errorCode = AMQConstant.getConstant(message.getBounceBody().replyCode); - AMQShortString reason = message.getBounceBody().replyText; - _logger.debug("Message returned with error code " + errorCode + " (" + reason + ")"); - - //@TODO should this be moved to an exception handler of sorts. Somewhere errors are converted to correct execeptions. - if (errorCode == AMQConstant.NO_CONSUMERS) - { - _connection.exceptionReceived(new AMQNoConsumersException("Error: " + reason, bouncedMessage)); - } - else if (errorCode == AMQConstant.NO_ROUTE) - { - _connection.exceptionReceived(new AMQNoRouteException("Error: " + reason, bouncedMessage)); - } - else - { - _connection.exceptionReceived(new AMQUndeliveredException(errorCode, "Error: " + reason, bouncedMessage)); - } - - } - catch (Exception e) - { - _logger.error("Caught exception trying to raise undelivered message exception (dump follows) - ignoring...", e); - } - } } public void close() @@ -1384,7 +1346,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi if (topicName.indexOf('/') == -1) { - return new AMQTopic(getDefaultTopicExchangeName(),new AMQShortString(topicName)); + return new AMQTopic(getDefaultTopicExchangeName(), new AMQShortString(topicName)); } else { @@ -1474,8 +1436,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } // if the queue is bound to the exchange but NOT for this topic, then the JMS spec // says we must trash the subscription. - if (isQueueBound(dest.getExchangeName(),dest.getAMQQueueName()) && - !isQueueBound(dest.getExchangeName(),dest.getAMQQueueName(), topicName)) + if (isQueueBound(dest.getExchangeName(), dest.getAMQQueueName()) && + !isQueueBound(dest.getExchangeName(), dest.getAMQQueueName(), topicName)) { deleteQueue(dest.getAMQQueueName()); } @@ -1634,9 +1596,59 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi + "] received in session with channel id " + _channelId); } - startDistpatcherIfNecessary(); + if (message.getDeliverBody() == null) + { + // Return of the bounced message. + returnBouncedMessage(message); + } + else + { + _queue.add(message); + } + } + + private void returnBouncedMessage(final UnprocessedMessage message) + { + _connection.performConnectionTask( + new Runnable() + { + public void run() + { + try + { + // Bounced message is processed here, away from the mina thread + AbstractJMSMessage bouncedMessage = _messageFactoryRegistry.createMessage(0, + false, + message.getBounceBody().exchange, + message.getBounceBody().routingKey, + message.getContentHeader(), + message.getBodies()); + + AMQConstant errorCode = AMQConstant.getConstant(message.getBounceBody().replyCode); + AMQShortString reason = message.getBounceBody().replyText; + _logger.debug("Message returned with error code " + errorCode + " (" + reason + ")"); + + //@TODO should this be moved to an exception handler of sorts. Somewhere errors are converted to correct execeptions. + if (errorCode == AMQConstant.NO_CONSUMERS) + { + _connection.exceptionReceived(new AMQNoConsumersException("Error: " + reason, bouncedMessage)); + } + else if (errorCode == AMQConstant.NO_ROUTE) + { + _connection.exceptionReceived(new AMQNoRouteException("Error: " + reason, bouncedMessage)); + } + else + { + _connection.exceptionReceived(new AMQUndeliveredException(errorCode, "Error: " + reason, bouncedMessage)); + } - _queue.add(message); + } + catch (Exception e) + { + _logger.error("Caught exception trying to raise undelivered message exception (dump follows) - ignoring...", e); + } + } + }); } /** @@ -1882,7 +1894,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi { throw new javax.jms.InvalidDestinationException("Cannot create a subscription on a temporary topic created in another session"); } - if(!(topic instanceof AMQTopic)) + if (!(topic instanceof AMQTopic)) { throw new javax.jms.InvalidDestinationException("Cannot create a subscription on topic created for another JMS Provider, class of topic provided is: " + topic.getClass().getName()); } @@ -1917,7 +1929,6 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } - public int getTicket() { return _ticket; |
