summaryrefslogtreecommitdiff
path: root/java/broker
diff options
context:
space:
mode:
Diffstat (limited to 'java/broker')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java22
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java88
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java17
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java32
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java14
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java1
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java4
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java3
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java47
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java7
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/logging/messages/Channel_logmessages.properties4
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/message/AMQMessage.java12
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java20
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java206
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java27
-rwxr-xr-xjava/broker/src/main/java/org/apache/qpid/server/queue/InboundMessageAdapter.java4
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java14
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java95
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java40
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/AMQBrokerManagerMBeanTest.java89
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/configuration/QueueConfigurationTest.java62
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java114
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/configuration/VirtualHostConfigurationTest.java100
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java16
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java360
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java36
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java17
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java17
28 files changed, 1355 insertions, 113 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java b/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java
index 76b6dad996..01a0d9900d 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java
@@ -20,8 +20,8 @@ package org.apache.qpid.server;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.List;
+import java.util.Map;
import javax.management.JMException;
import javax.management.MBeanException;
@@ -30,6 +30,7 @@ import javax.management.ObjectName;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.management.common.mbeans.ManagedBroker;
import org.apache.qpid.management.common.mbeans.ManagedQueue;
import org.apache.qpid.management.common.mbeans.annotations.MBeanConstructor;
@@ -243,7 +244,13 @@ public class AMQBrokerManagerMBean extends AMQManagedObject implements ManagedBr
*/
public void createNewQueue(String queueName, String owner, boolean durable) throws JMException, MBeanException
{
- AMQQueue queue = _queueRegistry.getQueue(new AMQShortString(queueName));
+ createNewQueue(queueName, owner, durable, null);
+ }
+
+ public void createNewQueue(String queueName, String owner, boolean durable, Map<String,Object> arguments) throws JMException
+ {
+ final AMQShortString queueNameAsAMQShortString = new AMQShortString(queueName);
+ AMQQueue queue = _queueRegistry.getQueue(queueNameAsAMQShortString);
if (queue != null)
{
throw new JMException("The queue \"" + queueName + "\" already exists.");
@@ -258,11 +265,18 @@ public class AMQBrokerManagerMBean extends AMQManagedObject implements ManagedBr
ownerShortString = new AMQShortString(owner);
}
+ FieldTable args = null;
+ if(arguments != null)
+ {
+ args = FieldTable.convertToFieldTable(arguments);
+ }
final VirtualHost virtualHost = getVirtualHost();
- queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString(queueName), durable, ownerShortString, false, false, virtualHost, null);
+
+ queue = AMQQueueFactory.createAMQQueueImpl(queueNameAsAMQShortString, durable, ownerShortString,
+ false, false, getVirtualHost(), args);
if (queue.isDurable() && !queue.isAutoDelete())
{
- _durableConfig.createQueue(queue);
+ _durableConfig.createQueue(queue, args);
}
virtualHost.getBindingFactory().addBinding(queueName, queue, _defaultExchange, null);
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index 34bc57a826..a4fd997568 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -22,7 +22,6 @@ package org.apache.qpid.server;
import org.apache.log4j.Logger;
-import org.apache.qpid.AMQConnectionException;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQSecurityException;
import org.apache.qpid.framing.AMQMethodBody;
@@ -53,6 +52,7 @@ import org.apache.qpid.server.logging.messages.ChannelMessages;
import org.apache.qpid.server.logging.messages.ExchangeMessages;
import org.apache.qpid.server.logging.subjects.ChannelLogSubject;
import org.apache.qpid.server.message.AMQMessage;
+import org.apache.qpid.server.message.InboundMessage;
import org.apache.qpid.server.message.MessageMetaData;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
@@ -63,6 +63,7 @@ import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.BaseQueue;
+import org.apache.qpid.server.queue.InboundMessageAdapter;
import org.apache.qpid.server.queue.IncomingMessage;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.registry.ApplicationRegistry;
@@ -692,6 +693,31 @@ public class AMQChannel implements SessionConfig, AMQSessionModel
}
+ public boolean isMaxDeliveryCountEnabled(final long deliveryTag)
+ {
+ final QueueEntry queueEntry = _unacknowledgedMessageMap.get(deliveryTag);
+ if (queueEntry != null)
+ {
+ final int maximumDeliveryCount = queueEntry.getQueue().getMaximumDeliveryCount();
+ return maximumDeliveryCount > 0;
+ }
+
+ return false;
+ }
+
+ public boolean isDeliveredTooManyTimes(final long deliveryTag)
+ {
+ final QueueEntry queueEntry = _unacknowledgedMessageMap.get(deliveryTag);
+ if (queueEntry != null)
+ {
+ final int maximumDeliveryCount = queueEntry.getQueue().getMaximumDeliveryCount();
+ final int numDeliveries = queueEntry.getDeliveryCount();
+ return maximumDeliveryCount != 0 && numDeliveries >= maximumDeliveryCount;
+ }
+
+ return false;
+ }
+
/**
* Called to resend all outstanding unacknowledged messages to this same channel.
*
@@ -739,9 +765,9 @@ public class AMQChannel implements SessionConfig, AMQSessionModel
QueueEntry message = entry.getValue();
long deliveryTag = entry.getKey();
+ //Amend the delivery counter as the client hasn't seen these messages yet.
+ message.decrementDeliveryCount();
-
- ServerMessage msg = message.getMessage();
AMQQueue queue = message.getQueue();
// Our Java Client will always suspend the channel when resending!
@@ -799,6 +825,10 @@ public class AMQChannel implements SessionConfig, AMQSessionModel
{
QueueEntry message = entry.getValue();
long deliveryTag = entry.getKey();
+
+ //Amend the delivery counter as the client hasn't seen these messages yet.
+ message.decrementDeliveryCount();
+
_unacknowledgedMessageMap.remove(deliveryTag);
message.setRedelivered();
@@ -1058,6 +1088,7 @@ public class AMQChannel implements SessionConfig, AMQSessionModel
_session.registerMessageDelivered(entry.getMessage().getSize());
getProtocolSession().getProtocolOutputConverter().writeDeliver(entry, getChannelId(),
deliveryTag, sub.getConsumerTag());
+ entry.incrementDeliveryCount();
}
};
@@ -1246,7 +1277,6 @@ public class AMQChannel implements SessionConfig, AMQSessionModel
{
private final Collection<QueueEntry> _ackedMessages;
-
public MessageAcknowledgeAction(Collection<QueueEntry> ackedMessages)
{
_ackedMessages = ackedMessages;
@@ -1479,4 +1509,54 @@ public class AMQChannel implements SessionConfig, AMQSessionModel
}
}
}
+
+ public void deadLetter(long deliveryTag) throws AMQException
+ {
+ final UnacknowledgedMessageMap unackedMap = getUnacknowledgedMessageMap();
+ final QueueEntry rejectedQueueEntry = unackedMap.get(deliveryTag);
+
+ if (rejectedQueueEntry == null)
+ {
+ _logger.warn("No message found, unable to DLQ delivery tag: " + deliveryTag);
+ return;
+ }
+ else
+ {
+ final ServerMessage msg = rejectedQueueEntry.getMessage();
+
+ final AMQQueue queue = rejectedQueueEntry.getQueue();
+
+ final Exchange altExchange = queue.getAlternateExchange();
+ unackedMap.remove(deliveryTag);
+
+ if (altExchange == null)
+ {
+ _logger.debug("No alternate exchange configured for queue, must discard the message as unable to DLQ: delivery tag: " + deliveryTag);
+ _actor.message(_logSubject, ChannelMessages.DISCARDMSG_NOALTEXCH(msg.getMessageNumber(), queue.getName(), msg.getRoutingKey()));
+ rejectedQueueEntry.discard();
+ return;
+ }
+
+ final InboundMessage m = new InboundMessageAdapter(rejectedQueueEntry);
+
+ final ArrayList<? extends BaseQueue> destinationQueues = altExchange.route(m);
+
+ if (destinationQueues == null || destinationQueues.isEmpty())
+ {
+ _logger.debug("Routing process provided no queues to enqueue the message on, must discard message as unable to DLQ: delivery tag: " + deliveryTag);
+ _actor.message(_logSubject, ChannelMessages.DISCARDMSG_NOROUTE(msg.getMessageNumber(), altExchange.getName()));
+ rejectedQueueEntry.discard();
+ return;
+ }
+
+ rejectedQueueEntry.routeToAlternate();
+
+ //output operational logging for each delivery post commit
+ for (final BaseQueue destinationQueue : destinationQueues)
+ {
+ _actor.message(_logSubject, ChannelMessages.DEADLETTERMSG(msg.getMessageNumber(), destinationQueue.getNameShortString().asString()));
+ }
+
+ }
+ }
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java b/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java
index 31c683b548..b8437c8430 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java
@@ -63,7 +63,9 @@ public class QueueConfiguration extends ConfigurationPlugin
"flowResumeCapacity",
"lvq",
"lvqKey",
- "sortKey"
+ "sortKey",
+ "maximumDeliveryCount",
+ "deadLetterQueues"
};
}
@@ -173,6 +175,19 @@ public class QueueConfiguration extends ConfigurationPlugin
return getStringValue("sortKey", null);
}
+ public int getMaxDeliveryCount()
+ {
+ return getIntValue("maximumDeliveryCount", _vHostConfig.getMaxDeliveryCount());
+ }
+
+ /**
+ * Check if dead letter queue delivery is enabled, deferring to the virtualhost configuration if not set.
+ */
+ public boolean isDeadLetterQueueEnabled()
+ {
+ return getBooleanValue("deadLetterQueues", _vHostConfig.isDeadLetterQueueEnabled());
+ }
+
public static class QueueConfig extends ConfigurationPlugin
{
@Override
diff --git a/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java b/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java
index e13e2f4d8f..4b42e39aa1 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java
@@ -40,6 +40,8 @@ import org.apache.commons.configuration.SystemConfiguration;
import org.apache.commons.configuration.XMLConfiguration;
import org.apache.log4j.Logger;
import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin;
+import org.apache.qpid.server.exchange.DefaultExchangeFactory;
+import org.apache.qpid.server.queue.AMQQueueFactory;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.signal.SignalHandlerTask;
import org.apache.qpid.server.virtualhost.VirtualHost;
@@ -810,4 +812,34 @@ public class ServerConfiguration extends ConfigurationPlugin
{
return getBooleanValue("management.managementRightsInferAllAccess", true);
}
+
+ public int getMaxDeliveryCount()
+ {
+ return getConfig().getInt("maximumDeliveryCount", 0);
+ }
+
+ /**
+ * Check if dead letter queue delivery is enabled, defaults to disabled if not set.
+ */
+ public boolean isDeadLetterQueueEnabled()
+ {
+ return getConfig().getBoolean("deadLetterQueues", false);
+ }
+
+ /**
+ * String to affix to end of queue name when generating an alternate exchange for DLQ purposes.
+ */
+ public String getDeadLetterExchangeSuffix()
+ {
+ return getConfig().getString("deadLetterExchangeSuffix", DefaultExchangeFactory.DEFAULT_DLE_NAME_SUFFIX);
+ }
+
+ /**
+ * String to affix to end of queue name when generating a queue for DLQ purposes.
+ */
+ public String getDeadLetterQueueSuffix()
+ {
+ return getConfig().getString("deadLetterQueueSuffix", AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX);
+ }
+
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java b/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java
index 6729a5ce0f..c4e4f701a8 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java
@@ -347,4 +347,18 @@ public class VirtualHostConfiguration extends ConfigurationPlugin
{
return getLongValue("transactionTimeout.idleClose", 0L);
}
+
+ public int getMaxDeliveryCount()
+ {
+ return getIntValue("queues.maximumDeliveryCount", ApplicationRegistry.getInstance().getConfiguration().getMaxDeliveryCount());
+ }
+
+ /**
+ * Check if dead letter queue delivery is enabled, deferring to the broker configuration if not set.
+ */
+ public boolean isDeadLetterQueueEnabled()
+ {
+ return getBooleanValue("queues.deadLetterQueues", ApplicationRegistry.getInstance().getConfiguration().isDeadLetterQueueEnabled());
+ }
+
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java
index 7837a9bc38..102cd42ac3 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java
@@ -40,6 +40,7 @@ import org.apache.qpid.server.virtualhost.VirtualHost;
public class DefaultExchangeFactory implements ExchangeFactory
{
private static final Logger _logger = Logger.getLogger(DefaultExchangeFactory.class);
+ public static final String DEFAULT_DLE_NAME_SUFFIX = "_DLE";
private Map<AMQShortString, ExchangeType<? extends Exchange>> _exchangeClassMap = new HashMap<AMQShortString, ExchangeType<? extends Exchange>>();
private final VirtualHost _host;
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java
index bd75f7bc51..76f86ea1b4 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java
@@ -117,7 +117,7 @@ public class FanoutExchange extends AbstractExchange
public boolean isBound(AMQShortString routingKey, AMQQueue queue)
{
- return _queues.contains(queue);
+ return _queues.containsKey(queue);
}
public boolean isBound(AMQShortString routingKey)
@@ -129,7 +129,7 @@ public class FanoutExchange extends AbstractExchange
public boolean isBound(AMQQueue queue)
{
- return _queues.contains(queue);
+ return _queues.containsKey(queue);
}
public boolean hasBindings()
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java
index ef9711004d..bbb009003c 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java
@@ -127,8 +127,6 @@ public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetB
final ClientDeliveryMethod getDeliveryMethod = new ClientDeliveryMethod()
{
- int _msg;
-
public void deliverToClient(final Subscription sub, final QueueEntry entry, final long deliveryTag)
throws AMQException
{
@@ -137,6 +135,7 @@ public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetB
{
session.getProtocolOutputConverter().writeGetOk(entry, channel.getChannelId(),
deliveryTag, queue.getMessageCount());
+ entry.incrementDeliveryCount();
}
else
{
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java
index 62dd76f832..0ea88e4ab6 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java
@@ -59,7 +59,6 @@ public class BasicRejectMethodHandler implements StateAwareMethodListener<BasicR
{
_logger.debug("Rejecting:" + body.getDeliveryTag() +
": Requeue:" + body.getRequeue() +
- //": Resend:" + evt.getMethod().resend +
" on channel:" + channel.debugIdentity());
}
@@ -70,26 +69,23 @@ public class BasicRejectMethodHandler implements StateAwareMethodListener<BasicR
if (message == null)
{
_logger.warn("Dropping reject request as message is null for tag:" + deliveryTag);
-// throw evt.getMethod().getChannelException(AMQConstant.NOT_FOUND, "Delivery Tag(" + deliveryTag + ")not known");
}
else
{
if (message.isQueueDeleted())
{
- _logger.warn("Message's Queue as already been purged, unable to Reject. " +
- "Dropping message should use Dead Letter Queue");
+ _logger.warn("Message's Queue has already been purged, dropping message");
message = channel.getUnacknowledgedMessageMap().remove(deliveryTag);
if(message != null)
{
message.discard();
}
- //sendtoDeadLetterQueue(msg)
return;
}
if (message.getMessage() == null)
{
- _logger.warn("Message as already been purged, unable to Reject.");
+ _logger.warn("Message has already been purged, unable to Reject.");
return;
}
@@ -98,27 +94,44 @@ public class BasicRejectMethodHandler implements StateAwareMethodListener<BasicR
{
_logger.debug("Rejecting: DT:" + deliveryTag + "-" + message.getMessage() +
": Requeue:" + body.getRequeue() +
- //": Resend:" + evt.getMethod().resend +
" on channel:" + channel.debugIdentity());
}
- // If we haven't requested message to be resent to this consumer then reject it from ever getting it.
- //if (!evt.getMethod().resend)
- {
- message.reject();
- }
+ message.reject();
if (body.getRequeue())
{
channel.requeue(deliveryTag);
+
+ //this requeue represents a message rejected from the pre-dispatch queue
+ //therefore we need to amend the delivery counter.
+ message.decrementDeliveryCount();
}
else
{
- _logger.warn("Dropping message as requeue not required and there is no dead letter queue");
- message = channel.getUnacknowledgedMessageMap().remove(deliveryTag);
- //sendtoDeadLetterQueue(AMQMessage message)
-// message.queue = channel.getDefaultDeadLetterQueue();
-// channel.requeue(deliveryTag);
+ final boolean maxDeliveryCountEnabled = channel.isMaxDeliveryCountEnabled(deliveryTag);
+ _logger.debug("maxDeliveryCountEnabled: " + maxDeliveryCountEnabled + " deliveryTag " + deliveryTag);
+ if (maxDeliveryCountEnabled)
+ {
+ final boolean deliveredTooManyTimes = channel.isDeliveredTooManyTimes(deliveryTag);
+ _logger.debug("deliveredTooManyTimes: " + deliveredTooManyTimes + " deliveryTag " + deliveryTag);
+ if (deliveredTooManyTimes)
+ {
+ channel.deadLetter(body.getDeliveryTag());
+ }
+ else
+ {
+ //this requeue represents a message rejected because of a recover/rollback that we
+ //are not ready to DLQ. We rely on the reject command to resend from the unacked map
+ //and therefore need to increment the delivery counter so we cancel out the effect
+ //of the AMQChannel#resend() decrement.
+ message.incrementDeliveryCount();
+ }
+ }
+ else
+ {
+ channel.deadLetter(body.getDeliveryTag());
+ }
}
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java
index 4643dee0a3..20ba3af458 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java
@@ -72,7 +72,12 @@ public class TxRollbackHandler implements StateAwareMethodListener<TxRollbackBod
};
channel.rollback(task);
-
+
+ //Now resend all the unacknowledged messages back to the original subscribers.
+ //(Must be done after the TxnRollback-ok response).
+ // Why, are we not allowed to send messages back to client before the ok method?
+ channel.resend(false);
+
}
catch (AMQException e)
{
diff --git a/java/broker/src/main/java/org/apache/qpid/server/logging/messages/Channel_logmessages.properties b/java/broker/src/main/java/org/apache/qpid/server/logging/messages/Channel_logmessages.properties
index ed8c0d0ce9..b5df212904 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/logging/messages/Channel_logmessages.properties
+++ b/java/broker/src/main/java/org/apache/qpid/server/logging/messages/Channel_logmessages.properties
@@ -32,3 +32,7 @@ FLOW_REMOVED = CHN-1006 : Flow Control Removed
# 0 - time in milliseconds
OPEN_TXN = CHN-1007 : Open Transaction : {0,number} ms
IDLE_TXN = CHN-1008 : Idle Transaction : {0,number} ms
+
+DISCARDMSG_NOALTEXCH = CHN-1009 : Discarded message : {0,number} as no alternate exchange configured for queue : {1} routing key : {2}
+DISCARDMSG_NOROUTE = CHN-1010 : Discarded message : {0,number} as no binding on alternate exchange : {1}
+DEADLETTERMSG = CHN-1011 : Message : {0,number} moved to dead letter queue : {1}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessage.java b/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessage.java
index db32f13d8d..32bf8aa17d 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessage.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessage.java
@@ -22,6 +22,7 @@ package org.apache.qpid.server.message;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.server.AMQChannel;
@@ -65,7 +66,6 @@ public class AMQMessage extends AbstractServerMessageImpl
WeakReference<AMQChannel> _channelRef;
-
public AMQMessage(StoredMessage<MessageMetaData> handle)
{
this(handle, null);
@@ -122,7 +122,15 @@ public class AMQMessage extends AbstractServerMessageImpl
public String getRoutingKey()
{
- // TODO
+ MessageMetaData messageMetaData = getMessageMetaData();
+ if (messageMetaData != null)
+ {
+ AMQShortString routingKey = messageMetaData.getMessagePublishInfo().getRoutingKey();
+ if (routingKey != null)
+ {
+ return routingKey.asString();
+ }
+ }
return null;
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
index 9140a13625..6dfdc5e8b4 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
@@ -213,6 +213,8 @@ public interface AMQQueue extends Managable, Comparable<AMQQueue>, ExchangeRefer
void setAlternateExchange(Exchange exchange);
+ void setAlternateExchange(String exchangeName);
+
Map<String, Object> getArguments();
void checkCapacity(AMQChannel channel);
@@ -272,4 +274,22 @@ public interface AMQQueue extends Managable, Comparable<AMQQueue>, ExchangeRefer
ManagedObject getManagedObject();
void setExclusive(boolean exclusive) throws AMQException;
+
+ /**
+ * Gets the maximum delivery count. If a message on this queue
+ * is delivered more than maximumDeliveryCount, the message will be
+ * routed to the {@link #getAlternateExchange()} (if set), or otherwise
+ * discarded. 0 indicates that maximum deliver count should not be enforced.
+ *
+ * @return maximum delivery count
+ */
+ int getMaximumDeliveryCount();
+
+ /**
+ * Sets the maximum delivery count.
+ *
+ * @param maximumDeliveryCount maximum delivery count
+ */
+ public void setMaximumDeliveryCount(final int maximumDeliveryCount);
+
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
index e4a6f01930..1b15bafb49 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
@@ -24,9 +24,15 @@ import java.util.HashMap;
import java.util.Map;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQSecurityException;
+import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.configuration.QueueConfiguration;
+import org.apache.qpid.server.configuration.ServerConfiguration;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.exchange.ExchangeFactory;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.virtualhost.VirtualHost;
public class AMQQueueFactory
@@ -37,6 +43,11 @@ public class AMQQueueFactory
public static final String QPID_LAST_VALUE_QUEUE_KEY = "qpid.last_value_queue_key";
public static final String QPID_QUEUE_SORT_KEY = "qpid.queue_sort_key";
+ public static final String DLQ_ROUTING_KEY = "dlq";
+ public static final String X_QPID_DLQ_ENABLED = "x-qpid-dlq-enabled";
+ public static final String X_QPID_MAXIMUM_DELIVERY_COUNT = "x-qpid-maximum-delivery-count";
+ public static final String DEFAULT_DLQ_NAME_SUFFIX = "_DLQ";
+
private abstract static class QueueProperty
{
@@ -80,6 +91,24 @@ public class AMQQueueFactory
}
+ private abstract static class QueueIntegerProperty extends QueueProperty
+ {
+ public QueueIntegerProperty(String argumentName)
+ {
+ super(argumentName);
+ }
+
+ public void setPropertyValue(AMQQueue queue, Object value)
+ {
+ if(value instanceof Number)
+ {
+ setPropertyValue(queue, ((Number)value).intValue());
+ }
+
+ }
+ abstract void setPropertyValue(AMQQueue queue, int value);
+ }
+
private static final QueueProperty[] DECLAREABLE_PROPERTIES = {
new QueueLongProperty("x-qpid-maximum-message-age")
{
@@ -122,8 +151,14 @@ public class AMQQueueFactory
{
queue.setFlowResumeCapacity(value);
}
+ },
+ new QueueIntegerProperty(X_QPID_MAXIMUM_DELIVERY_COUNT)
+ {
+ public void setPropertyValue(AMQQueue queue, int value)
+ {
+ queue.setMaximumDeliveryCount(value);
+ }
}
-
};
@@ -149,8 +184,13 @@ public class AMQQueueFactory
String owner,
boolean autoDelete,
boolean exclusive,
- VirtualHost virtualHost, Map<String, Object> arguments) throws AMQSecurityException
+ VirtualHost virtualHost, Map<String, Object> arguments) throws AMQSecurityException, AMQException
{
+ if (queueName == null)
+ {
+ throw new IllegalArgumentException("Queue name must not be null");
+ }
+
// Access check
if (!virtualHost.getSecurityManager().authoriseCreateQueue(autoDelete, durable, exclusive, null, null, new AMQShortString(queueName), owner))
{
@@ -158,6 +198,13 @@ public class AMQQueueFactory
throw new AMQSecurityException(description);
}
+ QueueConfiguration queueConfiguration = virtualHost.getConfiguration().getQueueConfiguration(queueName);
+ boolean isDLQEnabled = isDLQEnabled(autoDelete, arguments, queueConfiguration);
+ if (isDLQEnabled)
+ {
+ validateDLNames(queueName);
+ }
+
int priorities = 1;
String conflationKey = null;
String sortingKey = null;
@@ -219,10 +266,63 @@ public class AMQQueueFactory
}
}
- return q;
+ if(isDLQEnabled)
+ {
+ final String dlExchangeName = getDeadLetterExchangeName(queueName);
+ final String dlQueueName = getDeadLetterQueueName(queueName);
- }
+ final ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry();
+ final ExchangeFactory exchangeFactory = virtualHost.getExchangeFactory();
+ final QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
+
+ Exchange dlExchange = null;
+ synchronized(exchangeRegistry)
+ {
+ dlExchange = exchangeRegistry.getExchange(dlExchangeName);
+
+ if(dlExchange == null)
+ {
+ dlExchange = exchangeFactory.createExchange(new AMQShortString(dlExchangeName), ExchangeDefaults.FANOUT_EXCHANGE_CLASS, true, false, 0);
+
+ exchangeRegistry.registerExchange(dlExchange);
+
+ //enter the dle in the persistent store
+ virtualHost.getDurableConfigurationStore().createExchange(dlExchange);
+ }
+ }
+
+ AMQQueue dlQueue = null;
+
+ synchronized(queueRegistry)
+ {
+ dlQueue = queueRegistry.getQueue(dlQueueName);
+
+ if(dlQueue == null)
+ {
+ //set args to disable DLQ'ing/MDC from the DLQ itself, preventing loops etc
+ final Map<String, Object> args = new HashMap<String, Object>();
+ args.put(X_QPID_DLQ_ENABLED, false);
+ args.put(X_QPID_MAXIMUM_DELIVERY_COUNT, 0);
+
+ dlQueue = createAMQQueueImpl(dlQueueName, true, owner, false, exclusive, virtualHost, args);
+
+ //enter the dlq in the persistent store
+ virtualHost.getDurableConfigurationStore().createQueue(dlQueue, FieldTable.convertToFieldTable(args));
+ }
+ }
+
+ //ensure the queue is bound to the exchange
+ if(!dlExchange.isBound(DLQ_ROUTING_KEY, dlQueue))
+ {
+ //actual routing key used does not matter due to use of fanout exchange,
+ //but we will make the key 'dlq' as it can be logged at creation.
+ virtualHost.getBindingFactory().addBinding(DLQ_ROUTING_KEY, dlQueue, dlExchange, null);
+ }
+ q.setAlternateExchange(dlExchange);
+ }
+ return q;
+ }
public static AMQQueue createAMQQueueImpl(QueueConfiguration config, VirtualHost host) throws AMQException
{
@@ -250,10 +350,108 @@ public class AMQQueueFactory
arguments = new HashMap<String,Object>();
arguments.put(QPID_QUEUE_SORT_KEY, config.getQueueSortKey());
}
+ if (!config.getAutoDelete() && config.isDeadLetterQueueEnabled())
+ {
+ if (arguments == null)
+ {
+ arguments = new HashMap<String,Object>();
+ }
+ arguments.put(X_QPID_DLQ_ENABLED, true);
+ }
AMQQueue q = createAMQQueueImpl(queueName, durable, owner, autodelete, exclusive, host, arguments);
q.configure(config);
return q;
}
+
+ /**
+ * Validates DLQ and DLE names
+ * <p>
+ * DLQ name and DLQ exchange name need to be validated in order to keep
+ * integrity in cases when queue name passes validation check but DLQ name
+ * or DL exchange name fails to pass it. Otherwise, we might have situations
+ * when queue is created but DL exchange or/and DLQ creation fail.
+ * <p>
+ *
+ * @param name
+ * queue name
+ * @throws IllegalArgumentException
+ * thrown if length of queue name or exchange name exceed 255
+ */
+ protected static void validateDLNames(String name)
+ {
+ // check if DLQ name and DLQ exchange name do not exceed 255
+ String exchangeName = getDeadLetterExchangeName(name);
+ if (exchangeName.length() > AMQShortString.MAX_LENGTH)
+ {
+ throw new IllegalArgumentException("DL exchange name '" + exchangeName
+ + "' length exceeds limit of " + AMQShortString.MAX_LENGTH + " characters for queue " + name);
+ }
+ String queueName = getDeadLetterQueueName(name);
+ if (queueName.length() > AMQShortString.MAX_LENGTH)
+ {
+ throw new IllegalArgumentException("DLQ queue name '" + queueName + "' length exceeds limit of "
+ + AMQShortString.MAX_LENGTH + " characters for queue " + name);
+ }
+ }
+
+ /**
+ * Checks if DLQ is enabled for the queue.
+ *
+ * @param autoDelete
+ * queue auto-delete flag
+ * @param arguments
+ * queue arguments
+ * @param qConfig
+ * queue configuration
+ * @return true if DLQ enabled
+ */
+ protected static boolean isDLQEnabled(boolean autoDelete, Map<String, Object> arguments, QueueConfiguration qConfig)
+ {
+ //feature is not to be enabled for temporary queues or when explicitly disabled by argument
+ if (!autoDelete)
+ {
+ boolean dlqArgumentPresent = arguments != null && arguments.containsKey(X_QPID_DLQ_ENABLED);
+ if (dlqArgumentPresent || qConfig.isDeadLetterQueueEnabled())
+ {
+ boolean dlqEnabled = true;
+ if (dlqArgumentPresent)
+ {
+ Object argument = arguments.get(X_QPID_DLQ_ENABLED);
+ dlqEnabled = argument instanceof Boolean && ((Boolean)argument).booleanValue();
+ }
+ return dlqEnabled;
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Generates a dead letter queue name for a given queue name
+ *
+ * @param name
+ * queue name
+ * @return DLQ name
+ */
+ protected static String getDeadLetterQueueName(String name)
+ {
+ ServerConfiguration serverConfig = ApplicationRegistry.getInstance().getConfiguration();
+ String dlQueueName = name + serverConfig.getDeadLetterQueueSuffix();
+ return dlQueueName;
+ }
+
+ /**
+ * Generates a dead letter exchange name for a given queue name
+ *
+ * @param name
+ * queue name
+ * @return DL exchange name
+ */
+ protected static String getDeadLetterExchangeName(String name)
+ {
+ ServerConfiguration serverConfig = ApplicationRegistry.getInstance().getConfiguration();
+ String dlExchangeName = name + serverConfig.getDeadLetterExchangeSuffix();
+ return dlExchangeName;
+ }
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
index d58d95c801..b4765d6227 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
@@ -28,6 +28,7 @@ import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.management.common.mbeans.ManagedQueue;
import org.apache.qpid.management.common.mbeans.annotations.MBeanConstructor;
import org.apache.qpid.management.common.mbeans.annotations.MBeanDescription;
+import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.management.AMQManagedObject;
import org.apache.qpid.server.management.ManagedObject;
import org.apache.qpid.server.message.ServerMessage;
@@ -80,7 +81,7 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que
private final String _queueName;
// OpenMBean data types for viewMessages method
- private static OpenType[] _msgAttributeTypes = new OpenType[5]; // AMQ message attribute types.
+ private static OpenType[] _msgAttributeTypes = new OpenType[6]; // AMQ message attribute types.
private static CompositeType _messageDataType = null; // Composite type for representing AMQ Message data.
private static TabularType _messagelistDataType = null; // Datatype for representing AMQ messages list.
@@ -139,6 +140,7 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que
_msgAttributeTypes[2] = SimpleType.LONG; // For size
_msgAttributeTypes[3] = SimpleType.BOOLEAN; // For redelivered
_msgAttributeTypes[4] = SimpleType.LONG; // For queue position
+ _msgAttributeTypes[5] = SimpleType.INTEGER; // For delivery count
_messageDataType = new CompositeType("Message", "AMQ Message",
VIEW_MSGS_COMPOSITE_ITEM_NAMES_DESC.toArray(new String[VIEW_MSGS_COMPOSITE_ITEM_NAMES_DESC.size()]),
@@ -177,6 +179,11 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que
return _queue.getMessageCount();
}
+ public Integer getMaximumDeliveryCount()
+ {
+ return _queue.getMaximumDeliveryCount();
+ }
+
public Long getMaximumMessageSize()
{
return _queue.getMaximumMessageSize();
@@ -295,6 +302,18 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que
}
}
+ public void setAlternateExchange(String exchangeName)
+ {
+ _queue.setAlternateExchange(exchangeName);
+ }
+
+ public String getAlternateExchange()
+ {
+ Exchange exchange = _queue.getAlternateExchange();
+ String name = exchange == null ? null : exchange.getName();
+ return name == null ? null : name;
+ }
+
/**
* Checks if there is any notification to be send to the listeners
*/
@@ -472,7 +491,7 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que
ContentHeaderBody headerBody = msg.getContentHeaderBody();
// Create header attributes list
headerAttributes = getMessageHeaderProperties(headerBody);
- itemValues = new Object[]{msg.getMessageId(), headerAttributes, headerBody.bodySize, queueEntry.isRedelivered(), position};
+ itemValues = new Object[]{msg.getMessageId(), headerAttributes, headerBody.bodySize, queueEntry.isRedelivered(), position, queueEntry.getDeliveryCount()};
}
else if(serverMsg instanceof MessageTransferMessage)
{
@@ -481,13 +500,13 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que
// Create header attributes list
headerAttributes = getMessageTransferMessageHeaderProps(msg);
- itemValues = new Object[]{msg.getMessageNumber(), headerAttributes, msg.getSize(), queueEntry.isRedelivered(), position};
+ itemValues = new Object[]{msg.getMessageNumber(), headerAttributes, msg.getSize(), queueEntry.isRedelivered(), position, queueEntry.getDeliveryCount()};
}
else
{
//unknown message
headerAttributes = new String[]{"N/A"};
- itemValues = new Object[]{serverMsg.getMessageNumber(), headerAttributes, serverMsg.getSize(), queueEntry.isRedelivered(), position};
+ itemValues = new Object[]{serverMsg.getMessageNumber(), headerAttributes, serverMsg.getSize(), queueEntry.isRedelivered(), position, queueEntry.getDeliveryCount()};
}
CompositeData messageData = new CompositeDataSupport(_messageDataType,
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/InboundMessageAdapter.java b/java/broker/src/main/java/org/apache/qpid/server/queue/InboundMessageAdapter.java
index 77da08d8c4..26112d9f53 100755
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/InboundMessageAdapter.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/InboundMessageAdapter.java
@@ -24,7 +24,7 @@ package org.apache.qpid.server.queue;
import org.apache.qpid.server.message.InboundMessage;
import org.apache.qpid.server.message.AMQMessageHeader;
-class InboundMessageAdapter implements InboundMessage
+public class InboundMessageAdapter implements InboundMessage
{
private QueueEntry _entry;
@@ -33,7 +33,7 @@ class InboundMessageAdapter implements InboundMessage
{
}
- InboundMessageAdapter(QueueEntry entry)
+ public InboundMessageAdapter(QueueEntry entry)
{
_entry = entry;
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
index c1fb0258fa..37fad54c07 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
@@ -1,5 +1,7 @@
package org.apache.qpid.server.queue;
+import java.util.Collection;
+
import org.apache.qpid.AMQException;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.message.ServerMessage;
@@ -234,4 +236,16 @@ public interface QueueEntry extends Comparable<QueueEntry>, Filterable
* @return true if entry is either DEQUED or DELETED state
*/
boolean isDispensed();
+
+ /**
+ * Number of times this queue entry has been delivered.
+ *
+ * @return delivery count
+ */
+ int getDeliveryCount();
+
+ void incrementDeliveryCount();
+
+ void decrementDeliveryCount();
+
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
index ee1d214c1f..5bb5dc3462 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
@@ -31,10 +31,13 @@ import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
+import java.util.Collection;
+import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
@@ -80,6 +83,12 @@ public abstract class QueueEntryImpl implements QueueEntry
private volatile int _deliveryState;
+ /** Number of times this message has been delivered */
+ private volatile int _deliveryCount = 0;
+ private static final AtomicIntegerFieldUpdater<QueueEntryImpl> _deliveryCountUpdater = AtomicIntegerFieldUpdater
+ .newUpdater(QueueEntryImpl.class, "_deliveryCount");
+
+
public QueueEntryImpl(QueueEntryList<?> queueEntryList)
{
@@ -406,50 +415,51 @@ public abstract class QueueEntryImpl implements QueueEntry
public void routeToAlternate()
{
final AMQQueue currentQueue = getQueue();
- Exchange alternateExchange = currentQueue.getAlternateExchange();
+ Exchange alternateExchange = currentQueue.getAlternateExchange();
- if(alternateExchange != null)
+ if (alternateExchange != null)
+ {
+ final List<? extends BaseQueue> rerouteQueues = alternateExchange.route(new InboundMessageAdapter(this));
+ final ServerMessage message = getMessage();
+ if (rerouteQueues != null && rerouteQueues.size() != 0)
{
- final List<? extends BaseQueue> rerouteQueues = alternateExchange.route(new InboundMessageAdapter(this));
- final ServerMessage message = getMessage();
- if(rerouteQueues != null && rerouteQueues.size() != 0)
- {
- ServerTransaction txn = new AutoCommitTransaction(getQueue().getVirtualHost().getTransactionLog());
- txn.enqueue(rerouteQueues, message, new ServerTransaction.Action() {
- public void postCommit()
+ ServerTransaction txn = new AutoCommitTransaction(getQueue().getVirtualHost().getTransactionLog());
+
+ txn.enqueue(rerouteQueues, message, new ServerTransaction.Action()
+ {
+ public void postCommit()
+ {
+ try
{
- try
+ for (BaseQueue queue : rerouteQueues)
{
- for(BaseQueue queue : rerouteQueues)
- {
- queue.enqueue(message);
- }
- }
- catch (AMQException e)
- {
- throw new RuntimeException(e);
+ queue.enqueue(message);
}
}
-
- public void onRollback()
+ catch (AMQException e)
{
-
+ throw new RuntimeException(e);
}
- });
- txn.dequeue(currentQueue,message,
- new ServerTransaction.Action()
- {
- public void postCommit()
- {
- discard();
- }
-
- public void onRollback()
- {
-
- }
- });
+ }
+
+ public void onRollback()
+ {
+
+ }
+ });
+ txn.dequeue(currentQueue, message, new ServerTransaction.Action()
+ {
+ public void postCommit()
+ {
+ discard();
+ }
+
+ public void onRollback()
+ {
+
+ }
+ });
}
}
}
@@ -524,4 +534,19 @@ public abstract class QueueEntryImpl implements QueueEntry
return _state.isDispensed();
}
+ public int getDeliveryCount()
+ {
+ return _deliveryCount;
+ }
+
+ public void incrementDeliveryCount()
+ {
+ _deliveryCountUpdater.incrementAndGet(this);
+ }
+
+ public void decrementDeliveryCount()
+ {
+ _deliveryCountUpdater.decrementAndGet(this);
+ }
+
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
index ab47d89e01..7717c8ebfc 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
@@ -187,7 +187,8 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
private long _createTime = System.currentTimeMillis();
private ConfigurationPlugin _queueConfiguration;
-
+ /** the maximum delivery count for each message on this queue or 0 if maximum delivery count is not to be enforced. */
+ private int _maximumDeliveryCount = ApplicationRegistry.getInstance().getConfiguration().getMaxDeliveryCount();
protected SimpleAMQQueue(AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, boolean exclusive, VirtualHost virtualHost, Map<String,Object> arguments)
{
@@ -356,6 +357,22 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
_alternateExchange = exchange;
}
+ public void setAlternateExchange(String exchangeName)
+ {
+ if(exchangeName == null || exchangeName.equals(""))
+ {
+ _alternateExchange = null;
+ return;
+ }
+
+ Exchange exchange = getVirtualHost().getExchangeRegistry().getExchange(new AMQShortString(exchangeName));
+ if (exchange == null)
+ {
+ throw new RuntimeException("Exchange '" + exchangeName + "' is not registered with the VirtualHost.");
+ }
+ setAlternateExchange(exchange);
+ }
+
public Map<String, Object> getArguments()
{
return _arguments;
@@ -521,13 +538,12 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
//Reconfigure the queue for to reflect this new binding.
ConfigurationPlugin config = getVirtualHost().getConfiguration().getQueueConfiguration(this);
- if (_logger.isDebugEnabled())
- {
- _logger.debug("Reconfiguring queue(" + this + ") with config:" + config + " was "+ _queueConfiguration);
- }
-
if (config != null)
{
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Reconfiguring queue(" + this + ") with config:" + config + " was "+ _queueConfiguration);
+ }
// Reconfigure with new config.
configure(config);
}
@@ -2108,6 +2124,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
setMaximumMessageSize(((QueueConfiguration)config).getMaximumMessageSize());
setMaximumMessageCount(((QueueConfiguration)config).getMaximumMessageCount());
setMinimumAlertRepeatGap(((QueueConfiguration)config).getMinimumAlertRepeatGap());
+ setMaximumDeliveryCount(((QueueConfiguration)config).getMaxDeliveryCount());
_capacity = ((QueueConfiguration)config).getCapacity();
_flowResumeCapacity = ((QueueConfiguration)config).getFlowResumeCapacity();
}
@@ -2229,4 +2246,15 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
{
return _logActor;
}
+
+ public int getMaximumDeliveryCount()
+ {
+ return _maximumDeliveryCount;
+ }
+
+ public void setMaximumDeliveryCount(final int maximumDeliveryCount)
+ {
+ _maximumDeliveryCount = maximumDeliveryCount;
+ }
+
}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/AMQBrokerManagerMBeanTest.java b/java/broker/src/test/java/org/apache/qpid/server/AMQBrokerManagerMBeanTest.java
index fdd533b704..7d128f2bc5 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/AMQBrokerManagerMBeanTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/AMQBrokerManagerMBeanTest.java
@@ -20,18 +20,31 @@
*/
package org.apache.qpid.server;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.configuration.XMLConfiguration;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.management.common.mbeans.ManagedBroker;
+import org.apache.qpid.server.configuration.ServerConfiguration;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.logging.SystemOutMessageLogger;
+import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.logging.actors.TestLogActor;
+import org.apache.qpid.server.queue.AMQPriorityQueue;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.AMQQueueFactory;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.registry.IApplicationRegistry;
-import org.apache.qpid.server.util.InternalBrokerBaseCase;
+import org.apache.qpid.server.store.TestableMemoryMessageStore;
+import org.apache.qpid.server.util.TestApplicationRegistry;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.test.utils.QpidTestCase;
-public class AMQBrokerManagerMBeanTest extends InternalBrokerBaseCase
+public class AMQBrokerManagerMBeanTest extends QpidTestCase
{
private QueueRegistry _queueRegistry;
private ExchangeRegistry _exchangeRegistry;
@@ -95,14 +108,86 @@ public class AMQBrokerManagerMBeanTest extends InternalBrokerBaseCase
assertTrue("New queue should be bound to default exchange", defaultExchange.isBound(new AMQShortString(queueName)));
}
+ /**
+ * Tests that setting the {@link AMQQueueFactory#X_QPID_MAXIMUM_DELIVERY_COUNT} argument does cause the
+ * maximum delivery count to be set on the Queue.
+ */
+ public void testCreateNewQueueWithMaximumDeliveryCount() throws Exception
+ {
+ final Map<String,Object> args = new HashMap<String, Object>();
+ args.put(AMQQueueFactory.X_QPID_MAXIMUM_DELIVERY_COUNT, 5);
+
+ final AMQShortString queueName = new AMQShortString("testCreateNewQueueWithMaximumDeliveryCount");
+
+ final QueueRegistry qReg = _vHost.getQueueRegistry();
+
+ assertNull("The queue should not yet exist", qReg.getQueue(queueName));
+
+ final ManagedBroker mbean = new AMQBrokerManagerMBean((VirtualHostImpl.VirtualHostMBean) _vHost.getManagedObject());
+ mbean.createNewQueue(queueName.asString(), "test", false, args);
+
+ final AMQQueue createdQueue = qReg.getQueue(queueName);
+ assertNotNull("The queue was not registered as expected", createdQueue);
+ assertEquals("Unexpected maximum delivery count", 5, createdQueue.getMaximumDeliveryCount());
+ }
+
+ /**
+ * Tests that setting the {@link AMQQueueFactory#X_QPID_PRIORITIES} argument prompts creation of
+ * a Priority Queue.
+ */
+ public void testCreatePriorityQueue() throws Exception
+ {
+ int numPriorities = 7;
+ Map<String,Object> args = new HashMap<String, Object>();
+ args.put(AMQQueueFactory.X_QPID_PRIORITIES, numPriorities);
+
+ AMQShortString queueName = new AMQShortString("testCreatePriorityQueue");
+
+ QueueRegistry qReg = _vHost.getQueueRegistry();
+
+ assertNull("The queue should not yet exist", qReg.getQueue(queueName));
+
+ ManagedBroker mbean = new AMQBrokerManagerMBean((VirtualHostImpl.VirtualHostMBean) _vHost.getManagedObject());
+ mbean.createNewQueue(queueName.asString(), "test", false, args);
+
+ AMQQueue queue = qReg.getQueue(queueName);
+ assertEquals("Queue is not a priorty queue", AMQPriorityQueue.class, queue.getClass());
+ assertEquals("Number of priorities supported was not as expected", numPriorities, ((AMQPriorityQueue)queue).getPriorities());
+ }
+
@Override
public void setUp() throws Exception
{
super.setUp();
+
+ CurrentActor.set(new TestLogActor(new SystemOutMessageLogger()));
+
+ XMLConfiguration configXml = new XMLConfiguration();
+ configXml.addProperty("virtualhosts.virtualhost(-1).name", "test");
+ configXml.addProperty("virtualhosts.virtualhost(-1).test.store.class", TestableMemoryMessageStore.class.getName());
+
+ ServerConfiguration configuration = new ServerConfiguration(configXml);
+
+ ApplicationRegistry registry = new TestApplicationRegistry(configuration);
+ ApplicationRegistry.initialise(registry);
+ registry.getVirtualHostRegistry().setDefaultVirtualHostName("test");
+
IApplicationRegistry appRegistry = ApplicationRegistry.getInstance();
_vHost = appRegistry.getVirtualHostRegistry().getVirtualHost("test");
_queueRegistry = _vHost.getQueueRegistry();
_exchangeRegistry = _vHost.getExchangeRegistry();
}
+ @Override
+ public void tearDown() throws Exception
+ {
+ try
+ {
+ super.tearDown();
+ }
+ finally
+ {
+ ApplicationRegistry.remove();
+ }
+ }
}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/configuration/QueueConfigurationTest.java b/java/broker/src/test/java/org/apache/qpid/server/configuration/QueueConfigurationTest.java
index 9941c00499..e1a5e7d338 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/configuration/QueueConfigurationTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/configuration/QueueConfigurationTest.java
@@ -24,6 +24,8 @@ import junit.framework.TestCase;
import org.apache.commons.configuration.CompositeConfiguration;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.util.TestApplicationRegistry;
public class QueueConfigurationTest extends TestCase
{
@@ -43,11 +45,71 @@ public class QueueConfigurationTest extends TestCase
fullEnv.setProperty("queues.maximumMessageSize", 1);
fullEnv.setProperty("queues.maximumMessageCount", 1);
fullEnv.setProperty("queues.minimumAlertRepeatGap", 1);
+ fullEnv.setProperty("queues.deadLetterQueues", true);
+ fullEnv.setProperty("queues.maximumDeliveryCount", 5);
_fullHostConf = new VirtualHostConfiguration("test", fullEnv);
}
+ public void testMaxDeliveryCount() throws Exception
+ {
+ try
+ {
+ ApplicationRegistry registry = new TestApplicationRegistry(new ServerConfiguration(_env));
+ ApplicationRegistry.initialise(registry);
+
+ // Check default value
+ QueueConfiguration qConf = new QueueConfiguration("test", _emptyConf);
+ assertEquals("Unexpected default server configuration for max delivery count ", 0, qConf.getMaxDeliveryCount());
+
+ // Check explicit value
+ VirtualHostConfiguration vhostConfig = overrideConfiguration("maximumDeliveryCount", 7);
+ qConf = new QueueConfiguration("test", vhostConfig);
+ assertEquals("Unexpected host configuration for max delivery count", 7, qConf.getMaxDeliveryCount());
+
+ // Check inherited value
+ qConf = new QueueConfiguration("test", _fullHostConf);
+ assertEquals("Unexpected queue configuration for max delivery count", 5, qConf.getMaxDeliveryCount());
+
+ }
+ finally
+ {
+ ApplicationRegistry.remove();
+ }
+ }
+
+ /**
+ * Tests that the default setting for DLQ configuration is disabled, and verifies that it can be overridden
+ * at a broker or virtualhost level.
+ * @throws Exception
+ */
+ public void testIsDeadLetterQueueEnabled() throws Exception
+ {
+ try
+ {
+ ApplicationRegistry registry = new TestApplicationRegistry(new ServerConfiguration(_env));
+ ApplicationRegistry.initialise(registry);
+
+ // Check default value
+ QueueConfiguration qConf = new QueueConfiguration("test", _emptyConf);
+ assertFalse("Unexpected queue configuration for dead letter enabled attribute", qConf.isDeadLetterQueueEnabled());
+
+ // Check explicit value
+ VirtualHostConfiguration vhostConfig = overrideConfiguration("deadLetterQueues", true);
+ qConf = new QueueConfiguration("test", vhostConfig);
+ assertTrue("Unexpected queue configuration for dead letter enabled attribute", qConf.isDeadLetterQueueEnabled());
+
+ // Check inherited value
+ qConf = new QueueConfiguration("test", _fullHostConf);
+ assertTrue("Unexpected queue configuration for dead letter enabled attribute", qConf.isDeadLetterQueueEnabled());
+ }
+ finally
+ {
+ ApplicationRegistry.remove();
+ }
+ }
+
public void testGetMaximumMessageAge() throws ConfigurationException
{
// Check default value
diff --git a/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java b/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java
index 9ee2ed3812..7739f9976e 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java
@@ -25,6 +25,7 @@ import static org.apache.qpid.transport.ConnectionSettings.WILDCARD_ADDRESS;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
+import java.io.Writer;
import java.util.Locale;
import org.apache.commons.configuration.ConfigurationException;
@@ -1459,4 +1460,117 @@ public class ServerConfigurationTest extends QpidTestCase
ce.getMessage());
}
}
+
+ public void testMaxDeliveryCountDefault() throws Exception
+ {
+ final ServerConfiguration serverConfig = new ServerConfiguration(_config);
+ assertEquals(0, serverConfig.getMaxDeliveryCount());
+ }
+
+ public void testMaxDeliveryCount() throws Exception
+ {
+ _config.setProperty("maximumDeliveryCount", 5);
+ final ServerConfiguration serverConfig = new ServerConfiguration(_config);
+ assertEquals(5, serverConfig.getMaxDeliveryCount());
+ }
+
+ /**
+ * Test XML configuration file correctly enables dead letter queues
+ */
+ public void testDeadLetterQueueConfigurationFile() throws Exception
+ {
+ // Write config
+ File xml = File.createTempFile(getClass().getName(), "xml");
+ xml.deleteOnExit();
+ FileWriter config = new FileWriter(xml);
+ config.write("<broker>\n");
+ writeSecurity(config);
+ config.write("<deadLetterQueues>true</deadLetterQueues>\n");
+ config.write("<virtualhosts>\n");
+ config.write("<virtualhost>\n");
+ config.write("<name>test</name>\n");
+ config.write("<test>\n");
+ config.write("<queues>\n");
+ config.write("<deadLetterQueues>false</deadLetterQueues>\n");
+ config.write("<queue>\n");
+ config.write("<name>biggles</name>\n");
+ config.write("<biggles>\n");
+ config.write("<deadLetterQueues>true</deadLetterQueues>\n");
+ config.write("</biggles>\n");
+ config.write("</queue>\n");
+ config.write("<queue>\n");
+ config.write("<name>beetle</name>\n");
+ config.write("<beetle />\n");
+ config.write("</queue>\n");
+ config.write("</queues>\n");
+ config.write("</test>\n");
+ config.write("</virtualhost>\n");
+ config.write("<virtualhost>\n");
+ config.write("<name>extra</name>\n");
+ config.write("<extra>\n");
+ config.write("<queues>\n");
+ config.write("<queue>\n");
+ config.write("<name>r2d2</name>\n");
+ config.write("<r2d2>\n");
+ config.write("<deadLetterQueues>false</deadLetterQueues>\n");
+ config.write("</r2d2>\n");
+ config.write("</queue>\n");
+ config.write("<queue>\n");
+ config.write("<name>c3p0</name>\n");
+ config.write("<c3p0 />\n");
+ config.write("</queue>\n");
+ config.write("</queues>\n");
+ config.write("</extra>\n");
+ config.write("</virtualhost>\n");
+ config.write("</virtualhosts>\n");
+ config.write("</broker>\n");
+ config.close();
+
+ // Load config
+ ApplicationRegistry.remove();
+ ApplicationRegistry registry = new ConfigurationFileApplicationRegistry(xml);
+ ApplicationRegistry.initialise(registry);
+ ServerConfiguration serverConfiguration = ApplicationRegistry.getInstance().getConfiguration();
+
+ VirtualHostConfiguration test = serverConfiguration.getVirtualHostConfig("test");
+ assertNotNull("Host 'test' is not found", test);
+ VirtualHostConfiguration extra = serverConfiguration.getVirtualHostConfig("extra");
+ assertNotNull("Host 'extra' is not found", test);
+
+ QueueConfiguration biggles = test.getQueueConfiguration("biggles");
+ QueueConfiguration beetle = test.getQueueConfiguration("beetle");
+ QueueConfiguration r2d2 = extra.getQueueConfiguration("r2d2");
+ QueueConfiguration c3p0 = extra.getQueueConfiguration("c3p0");
+
+ // Validate config
+ assertTrue("Broker DLQ should be configured as enabled", serverConfiguration.isDeadLetterQueueEnabled());
+ assertFalse("Test vhost DLQ should be configured as disabled", test.isDeadLetterQueueEnabled());
+ assertTrue("Extra vhost DLQ should be enabled, using broker default", extra.isDeadLetterQueueEnabled());
+ assertTrue("Biggles queue DLQ should be configured as enabled", biggles.isDeadLetterQueueEnabled());
+ assertFalse("Beetle queue DLQ should be disabled, using test vhost default", beetle.isDeadLetterQueueEnabled());
+ assertFalse("R2D2 queue DLQ should be configured as disabled", r2d2.isDeadLetterQueueEnabled());
+ assertTrue("C3P0 queue DLQ should be enabled, using broker default", c3p0.isDeadLetterQueueEnabled());
+ }
+
+ /**
+ * Convenience method to output required security preamble for broker config
+ */
+ private void writeSecurity(Writer out) throws Exception
+ {
+ out.write("\t<management><enabled>false</enabled></management>\n");
+ out.write("\t<security>\n");
+ out.write("\t\t<pd-auth-manager>\n");
+ out.write("\t\t\t<principal-database>\n");
+ out.write("\t\t\t\t<class>org.apache.qpid.server.security.auth.database.PlainPasswordFilePrincipalDatabase</class>\n");
+ out.write("\t\t\t\t<attributes>\n");
+ out.write("\t\t\t\t\t<attribute>\n");
+ out.write("\t\t\t\t\t\t<name>passwordFile</name>\n");
+ out.write("\t\t\t\t\t\t<value>/dev/null</value>\n");
+ out.write("\t\t\t\t\t</attribute>\n");
+ out.write("\t\t\t\t</attributes>\n");
+ out.write("\t\t\t</principal-database>\n");
+ out.write("\t\t\t<jmx-access>/dev/null</jmx-access>\n");
+ out.write("\t\t</pd-auth-manager>\n");
+ out.write("\t</security>\n");
+ }
}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/configuration/VirtualHostConfigurationTest.java b/java/broker/src/test/java/org/apache/qpid/server/configuration/VirtualHostConfigurationTest.java
index b133d53ac5..f6cd397217 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/configuration/VirtualHostConfigurationTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/configuration/VirtualHostConfigurationTest.java
@@ -21,7 +21,6 @@ package org.apache.qpid.server.configuration;
import org.apache.commons.configuration.ConfigurationException;
-import org.apache.commons.configuration.XMLConfiguration;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.server.queue.AMQPriorityQueue;
import org.apache.qpid.server.queue.AMQQueue;
@@ -34,19 +33,6 @@ public class VirtualHostConfigurationTest extends InternalBrokerBaseCase
{
@Override
- public void setUp() throws Exception
- {
- super.setUp();
- // Set the default configuration items
- getConfigXml().clear();
- getConfigXml().addProperty("virtualhosts.virtualhost(-1).name", "test");
- getConfigXml().addProperty("virtualhosts.virtualhost(-1).test.store.class", TestableMemoryMessageStore.class.getName());
-
- getConfigXml().addProperty("virtualhosts.virtualhost.name", getName());
- getConfigXml().addProperty("virtualhosts.virtualhost."+getName()+".store.class", TestableMemoryMessageStore.class.getName());
- }
-
- @Override
public void createBroker()
{
// Prevent auto broker startup
@@ -134,6 +120,88 @@ public class VirtualHostConfigurationTest extends InternalBrokerBaseCase
assertEquals(3, bTest.getMaximumMessageAge());
}
+ public void testMaxDeliveryCount() throws Exception
+ {
+ // Set up vhosts and queues
+ getConfigXml().addProperty("virtualhosts.virtualhost." + getName() + ".queues.maximumDeliveryCount", 5);
+ getConfigXml().addProperty("virtualhosts.virtualhost." + getName() + ".queues(-1).queue(-1).name", "biggles");
+ getConfigXml().addProperty("virtualhosts.virtualhost." + getName() + ".queues.queue.biggles.maximumDeliveryCount", 4);
+ getConfigXml().addProperty("virtualhosts.virtualhost." + getName() + ".queues(-1).queue(-1).name", "beetle");
+
+ // Start the broker now.
+ super.createBroker();
+
+ // Get vhosts
+ VirtualHost test = ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost(getName());
+
+ // Enabled specifically
+ assertEquals("Test vhost MDC was configured as enabled", 5 ,test.getConfiguration().getMaxDeliveryCount());
+
+ // Enabled by test vhost default
+ assertEquals("beetle queue DLQ was configured as enabled", test.getConfiguration().getMaxDeliveryCount(), test.getConfiguration().getQueueConfiguration("beetle").getMaxDeliveryCount());
+
+ // Disabled specifically
+ assertEquals("Biggles queue DLQ was configured as disabled", 4, test.getConfiguration().getQueueConfiguration("biggles").getMaxDeliveryCount());
+ }
+
+ /**
+ * Tests the full set of configuration options for enabling DLQs in the broker configuration.
+ */
+ public void testIsDeadLetterQueueEnabled() throws Exception
+ {
+ // Set up vhosts and queues
+ getConfigXml().addProperty("virtualhosts.virtualhost." + getName() + ".queues.deadLetterQueues", "true");
+ getConfigXml().addProperty("virtualhosts.virtualhost." + getName() + ".queues(-1).queue(-1).name", "biggles");
+ getConfigXml().addProperty("virtualhosts.virtualhost." + getName() + ".queues.queue.biggles.deadLetterQueues", "false");
+ getConfigXml().addProperty("virtualhosts.virtualhost." + getName() + ".queues(-1).queue(-1).name", "beetle");
+
+
+ getConfigXml().addProperty("virtualhosts.virtualhost.name", getName() + "Extra");
+ getConfigXml().addProperty("virtualhosts.virtualhost." + getName() + "Extra.queues(-1).queue(-1).name", "r2d2");
+ getConfigXml().addProperty("virtualhosts.virtualhost." + getName() + "Extra.queues.queue.r2d2.deadLetterQueues", "true");
+ getConfigXml().addProperty("virtualhosts.virtualhost." + getName() + "Extra.queues(-1).queue(-1).name", "c3p0");
+ getConfigXml().addProperty("virtualhosts.virtualhost." + getName() + "Extra.store.class", TestableMemoryMessageStore.class.getName());
+
+ // Start the broker now.
+ super.createBroker();
+
+ // Get vhosts
+ VirtualHost test = ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost(getName());
+ VirtualHost extra = ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost(getName() + "Extra");
+
+ // Enabled specifically
+ assertTrue("Test vhost DLQ was configured as enabled", test.getConfiguration().isDeadLetterQueueEnabled());
+ assertTrue("r2d2 queue DLQ was configured as enabled", extra.getConfiguration().getQueueConfiguration("r2d2").isDeadLetterQueueEnabled());
+
+ // Enabled by test vhost default
+ assertTrue("beetle queue DLQ was configured as enabled", test.getConfiguration().getQueueConfiguration("beetle").isDeadLetterQueueEnabled());
+
+ // Disabled specifically
+ assertFalse("Biggles queue DLQ was configured as disabled", test.getConfiguration().getQueueConfiguration("biggles").isDeadLetterQueueEnabled());
+
+ // Using broker default of disabled
+ assertFalse("Extra vhost DLQ disabled, using broker default", extra.getConfiguration().isDeadLetterQueueEnabled());
+ assertFalse("c3p0 queue DLQ was configured as disabled", extra.getConfiguration().getQueueConfiguration("c3p0").isDeadLetterQueueEnabled());
+
+ // Get queues
+ AMQQueue biggles = test.getQueueRegistry().getQueue(new AMQShortString("biggles"));
+ AMQQueue beetle = test.getQueueRegistry().getQueue(new AMQShortString("beetle"));
+ AMQQueue r2d2 = extra.getQueueRegistry().getQueue(new AMQShortString("r2d2"));
+ AMQQueue c3p0 = extra.getQueueRegistry().getQueue(new AMQShortString("c3p0"));
+
+ // Disabled specifically for this queue, overriding virtualhost setting
+ assertNull("Biggles queue should not have alt exchange as DLQ should be configured as disabled: " + biggles.getAlternateExchange(), biggles.getAlternateExchange());
+
+ // Enabled for all queues on the virtualhost
+ assertNotNull("Beetle queue should have an alt exchange as DLQ should be enabled, using test vhost default", beetle.getAlternateExchange());
+
+ // Enabled specifically for this queue, overriding the default broker setting of disabled
+ assertNotNull("R2D2 queue should have an alt exchange as DLQ should be configured as enabled", r2d2.getAlternateExchange());
+
+ // Disabled by the default broker setting
+ assertNull("C3PO queue should not have an alt exchange as DLQ should be disabled, using broker default", c3p0.getAlternateExchange());
+ }
+
/**
* Test that the house keeping pool sizes is correctly processed
*
@@ -173,7 +241,7 @@ public class VirtualHostConfigurationTest extends InternalBrokerBaseCase
vhost.getHouseKeepingTaskCount());
// Currently the two are tasks:
- // ExpiredMessageTask from VirtualHost
+ // ExpiredMessageTask from VirtualHost
// UpdateTask from the QMF ManagementExchange
}
@@ -214,7 +282,7 @@ public class VirtualHostConfigurationTest extends InternalBrokerBaseCase
{
getConfigXml().addProperty("virtualhosts.virtualhost.testSecurityAuthenticationNameRejected.security.authentication.name",
"testdb");
-
+
try
{
super.createBroker();
diff --git a/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java b/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
index 7b73987abf..ea2fe90da6 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
@@ -490,6 +490,22 @@ public class AbstractHeadersExchangeTestBase extends InternalBrokerBaseCase
{
return null;
}
+
+ @Override
+ public int getDeliveryCount()
+ {
+ return 0;
+ }
+
+ @Override
+ public void incrementDeliveryCount()
+ {
+ }
+
+ @Override
+ public void decrementDeliveryCount()
+ {
+ }
};
if(action != null)
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java
index 27891289fb..2b7d1d7f26 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java
@@ -20,39 +20,76 @@
*/
package org.apache.qpid.server.queue;
-import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.util.InternalBrokerBaseCase;
-import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.commons.configuration.XMLConfiguration;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.server.configuration.ServerConfiguration;
+import org.apache.qpid.server.exchange.DefaultExchangeFactory;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.logging.SystemOutMessageLogger;
+import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.logging.actors.TestLogActor;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.store.TestableMemoryMessageStore;
+import org.apache.qpid.server.util.TestApplicationRegistry;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.test.utils.QpidTestCase;
-public class AMQQueueFactoryTest extends InternalBrokerBaseCase
+public class AMQQueueFactoryTest extends QpidTestCase
{
QueueRegistry _queueRegistry;
VirtualHost _virtualHost;
- int _defaultQueueCount;
@Override
public void setUp() throws Exception
{
super.setUp();
- ApplicationRegistry registry = (ApplicationRegistry) ApplicationRegistry.getInstance();
- _virtualHost = registry.getVirtualHostRegistry().getVirtualHost("test");
+ CurrentActor.set(new TestLogActor(new SystemOutMessageLogger()));
+
+ XMLConfiguration configXml = new XMLConfiguration();
+ configXml.addProperty("virtualhosts.virtualhost(-1).name", getName());
+ configXml.addProperty("virtualhosts.virtualhost(-1)."+getName()+".store.class", TestableMemoryMessageStore.class.getName());
+
+ ServerConfiguration configuration = new ServerConfiguration(configXml);
+
+ ApplicationRegistry registry = new TestApplicationRegistry(configuration);
+ ApplicationRegistry.initialise(registry);
+ registry.getVirtualHostRegistry().setDefaultVirtualHostName(getName());
+
+ _virtualHost = registry.getVirtualHostRegistry().getVirtualHost(getName());
_queueRegistry = _virtualHost.getQueueRegistry();
- _defaultQueueCount = _queueRegistry.getQueues().size();
}
@Override
public void tearDown() throws Exception
{
- assertEquals("Queue was not registered in virtualhost", _defaultQueueCount + 1, _queueRegistry.getQueues().size());
- super.tearDown();
+ try
+ {
+ super.tearDown();
+ }
+ finally
+ {
+ ApplicationRegistry.remove();
+ }
+ }
+
+ private void verifyRegisteredQueueCount(int count)
+ {
+ assertEquals("Queue was not registered in virtualhost", count, _queueRegistry.getQueues().size());
}
+ private void verifyQueueRegistered(String queueName)
+ {
+ assertNotNull("Queue " + queueName + " was not created", _queueRegistry.getQueue(queueName));
+ }
+
public void testPriorityQueueRegistration() throws Exception
{
FieldTable fieldTable = new FieldTable();
@@ -63,13 +100,314 @@ public class AMQQueueFactoryTest extends InternalBrokerBaseCase
false, _virtualHost, fieldTable);
assertEquals("Queue not a priorty queue", AMQPriorityQueue.class, queue.getClass());
+ verifyQueueRegistered("testPriorityQueue");
+ verifyRegisteredQueueCount(1);
}
public void testSimpleQueueRegistration() throws Exception
{
- AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("testQueue"), false, new AMQShortString("owner"), false,
+ AMQShortString queueName = new AMQShortString("testSimpleQueueRegistration");
+ AMQShortString dlQueueName = new AMQShortString(queueName + AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX);
+
+ AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(queueName, false, new AMQShortString("owner"), false,
false, _virtualHost, null);
assertEquals("Queue not a simple queue", SimpleAMQQueue.class, queue.getClass());
+ verifyQueueRegistered("testSimpleQueueRegistration");
+
+ //verify that no alternate exchange or DLQ were produced
+ QueueRegistry qReg = _virtualHost.getQueueRegistry();
+
+ assertNull("Queue should not have an alternate exchange as DLQ wasnt enabled", queue.getAlternateExchange());
+ assertNull("The DLQ should not exist", qReg.getQueue(dlQueueName));
+
+ verifyRegisteredQueueCount(1);
+ }
+
+ /**
+ * Tests that setting the {@link AMQQueueFactory#X_QPID_DLQ_ENABLED} argument true does
+ * cause the alternate exchange to be set and DLQ to be produced.
+ * @throws AMQException
+ */
+ public void testDeadLetterQueueEnabled() throws AMQException
+ {
+ FieldTable fieldTable = new FieldTable();
+ fieldTable.setBoolean(AMQQueueFactory.X_QPID_DLQ_ENABLED, true);
+
+ AMQShortString queueName = new AMQShortString("testDeadLetterQueueEnabled");
+ AMQShortString dlExchangeName = new AMQShortString(queueName + DefaultExchangeFactory.DEFAULT_DLE_NAME_SUFFIX);
+ AMQShortString dlQueueName = new AMQShortString(queueName + AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX);
+
+ QueueRegistry qReg = _virtualHost.getQueueRegistry();
+ ExchangeRegistry exReg = _virtualHost.getExchangeRegistry();
+
+ assertNull("The DLQ should not yet exist", qReg.getQueue(dlQueueName));
+ assertNull("The alternate exchange should not yet exist", exReg.getExchange(dlExchangeName));
+
+ AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(queueName, false, new AMQShortString("owner"), false, false,
+ _virtualHost, fieldTable);
+
+ Exchange altExchange = queue.getAlternateExchange();
+ assertNotNull("Queue should have an alternate exchange as DLQ is enabled", altExchange);
+ assertEquals("Alternate exchange name was not as expected", dlExchangeName, altExchange.getName());
+ assertEquals("Alternate exchange type was not as expected", ExchangeDefaults.FANOUT_EXCHANGE_CLASS, altExchange.getType().getName());
+
+ assertNotNull("The alternate exchange was not registered as expected", exReg.getExchange(dlExchangeName));
+ assertEquals("The registered exchange was not the expected exchange instance", altExchange, exReg.getExchange(dlExchangeName));
+
+ AMQQueue dlQueue = qReg.getQueue(dlQueueName);
+ assertNotNull("The DLQ was not registered as expected", dlQueue);
+ assertTrue("DLQ should have been bound to the alternate exchange", altExchange.isBound(dlQueue));
+ assertNull("DLQ should have no alternate exchange", dlQueue.getAlternateExchange());
+ assertEquals("DLQ should have a zero maximum delivery count", 0, dlQueue.getMaximumDeliveryCount());
+
+ //2 queues should have been registered
+ verifyRegisteredQueueCount(2);
+ }
+
+ /**
+ * Tests that the deadLetterQueues/maximumDeliveryCount settings from the configuration
+ * are not applied to the DLQ itself.
+ * @throws AMQException
+ */
+ public void testDeadLetterQueueDoesNotInheritDLQorMDCSettings() throws AMQException
+ {
+ ApplicationRegistry.getInstance().getConfiguration().getConfig().addProperty("deadLetterQueues","true");
+ ApplicationRegistry.getInstance().getConfiguration().getConfig().addProperty("maximumDeliveryCount","5");
+
+ AMQShortString queueName = new AMQShortString("testDeadLetterQueueEnabled");
+ AMQShortString dlExchangeName = new AMQShortString(queueName + DefaultExchangeFactory.DEFAULT_DLE_NAME_SUFFIX);
+ AMQShortString dlQueueName = new AMQShortString(queueName + AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX);
+
+ QueueRegistry qReg = _virtualHost.getQueueRegistry();
+ ExchangeRegistry exReg = _virtualHost.getExchangeRegistry();
+
+ assertNull("The DLQ should not yet exist", qReg.getQueue(dlQueueName));
+ assertNull("The alternate exchange should not yet exist", exReg.getExchange(dlExchangeName));
+
+ AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(queueName, false, new AMQShortString("owner"), false, false,
+ _virtualHost, null);
+
+ assertEquals("Unexpected maximum delivery count", 5, queue.getMaximumDeliveryCount());
+ Exchange altExchange = queue.getAlternateExchange();
+ assertNotNull("Queue should have an alternate exchange as DLQ is enabled", altExchange);
+ assertEquals("Alternate exchange name was not as expected", dlExchangeName, altExchange.getName());
+ assertEquals("Alternate exchange type was not as expected", ExchangeDefaults.FANOUT_EXCHANGE_CLASS, altExchange.getType().getName());
+
+ assertNotNull("The alternate exchange was not registered as expected", exReg.getExchange(dlExchangeName));
+ assertEquals("The registered exchange was not the expected exchange instance", altExchange, exReg.getExchange(dlExchangeName));
+
+ AMQQueue dlQueue = qReg.getQueue(dlQueueName);
+ assertNotNull("The DLQ was not registered as expected", dlQueue);
+ assertTrue("DLQ should have been bound to the alternate exchange", altExchange.isBound(dlQueue));
+ assertNull("DLQ should have no alternate exchange", dlQueue.getAlternateExchange());
+ assertEquals("DLQ should have a zero maximum delivery count", 0, dlQueue.getMaximumDeliveryCount());
+
+ //2 queues should have been registered
+ verifyRegisteredQueueCount(2);
+ }
+
+ /**
+ * Tests that setting the {@link AMQQueueFactory#X_QPID_DLQ_ENABLED} argument false does not
+ * result in the alternate exchange being set and DLQ being created.
+ * @throws AMQException
+ */
+ public void testDeadLetterQueueDisabled() throws AMQException
+ {
+ FieldTable fieldTable = new FieldTable();
+ fieldTable.setBoolean(AMQQueueFactory.X_QPID_DLQ_ENABLED, false);
+
+ AMQShortString queueName = new AMQShortString("testDeadLetterQueueDisabled");
+ AMQShortString dlExchangeName = new AMQShortString(queueName + DefaultExchangeFactory.DEFAULT_DLE_NAME_SUFFIX);
+ AMQShortString dlQueueName = new AMQShortString(queueName + AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX);
+
+ QueueRegistry qReg = _virtualHost.getQueueRegistry();
+ ExchangeRegistry exReg = _virtualHost.getExchangeRegistry();
+
+ assertNull("The DLQ should not yet exist", qReg.getQueue(dlQueueName));
+ assertNull("The alternate exchange should not exist", exReg.getExchange(dlExchangeName));
+
+ AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(queueName, false, new AMQShortString("owner"), false, false,
+ _virtualHost, fieldTable);
+
+ assertNull("Queue should not have an alternate exchange as DLQ is disabled", queue.getAlternateExchange());
+ assertNull("The alternate exchange should still not exist", exReg.getExchange(dlExchangeName));
+
+ assertNull("The DLQ should still not exist", qReg.getQueue(dlQueueName));
+
+ //only 1 queue should have been registered
+ verifyRegisteredQueueCount(1);
+ }
+
+ /**
+ * Tests that setting the {@link AMQQueueFactory#X_QPID_DLQ_ENABLED} argument true but
+ * creating an auto-delete queue, does not result in the alternate exchange
+ * being set and DLQ being created.
+ * @throws AMQException
+ */
+ public void testDeadLetterQueueNotCreatedForAutodeleteQueues() throws AMQException
+ {
+ FieldTable fieldTable = new FieldTable();
+ fieldTable.setBoolean(AMQQueueFactory.X_QPID_DLQ_ENABLED, true);
+
+ AMQShortString queueName = new AMQShortString("testDeadLetterQueueNotCreatedForAutodeleteQueues");
+ AMQShortString dlExchangeName = new AMQShortString(queueName + DefaultExchangeFactory.DEFAULT_DLE_NAME_SUFFIX);
+ AMQShortString dlQueueName = new AMQShortString(queueName + AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX);
+
+ QueueRegistry qReg = _virtualHost.getQueueRegistry();
+ ExchangeRegistry exReg = _virtualHost.getExchangeRegistry();
+
+ assertNull("The DLQ should not yet exist", qReg.getQueue(dlQueueName));
+ assertNull("The alternate exchange should not exist", exReg.getExchange(dlExchangeName));
+
+ //create an autodelete queue
+ AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(queueName, false, new AMQShortString("owner"), true, false,
+ _virtualHost, fieldTable);
+ assertTrue("Queue should be autodelete", queue.isAutoDelete());
+
+ //ensure that the autodelete property overrides the request to enable DLQ
+ assertNull("Queue should not have an alternate exchange as queue is autodelete", queue.getAlternateExchange());
+ assertNull("The alternate exchange should not exist as queue is autodelete", exReg.getExchange(dlExchangeName));
+ assertNull("The DLQ should not exist as queue is autodelete", qReg.getQueue(dlQueueName));
+
+ //only 1 queue should have been registered
+ verifyRegisteredQueueCount(1);
+ }
+
+ /**
+ * Tests that setting the {@link AMQQueueFactory#X_QPID_MAXIMUM_DELIVERY_COUNT} argument has
+ * the desired effect.
+ */
+ public void testMaximumDeliveryCount() throws Exception
+ {
+ final FieldTable fieldTable = new FieldTable();
+ fieldTable.setInteger(AMQQueueFactory.X_QPID_MAXIMUM_DELIVERY_COUNT, 5);
+
+ final AMQShortString queueName = new AMQShortString("testMaximumDeliveryCount");
+
+ final AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(queueName, false, new AMQShortString("owner"), false, false,
+ _virtualHost, fieldTable);
+
+ assertNotNull("The queue was not registered as expected ", queue);
+ assertEquals("Maximum delivery count not as expected", 5, queue.getMaximumDeliveryCount());
+
+ verifyRegisteredQueueCount(1);
+ }
+
+ /**
+ * Tests that omitting the {@link AMQQueueFactory#X_QPID_MAXIMUM_DELIVERY_COUNT} argument means
+ * that queue is created with a default maximumDeliveryCount of zero (unless set in config).
+ */
+ public void testMaximumDeliveryCountDefault() throws Exception
+ {
+
+ final AMQShortString queueName = new AMQShortString("testMaximumDeliveryCount");
+
+ final AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(queueName, false, new AMQShortString("owner"), false, false,
+ _virtualHost, null);
+
+ assertNotNull("The queue was not registered as expected ", queue);
+ assertEquals("Maximum delivery count not as expected", 0, queue.getMaximumDeliveryCount());
+
+ verifyRegisteredQueueCount(1);
+ }
+
+ /**
+ * Tests queue creation with queue name set to null
+ */
+ public void testQueueNameNullValidation()
+ {
+ try
+ {
+ AMQQueueFactory.createAMQQueueImpl(null, false, new AMQShortString("owner"), true, false, _virtualHost, null);
+ fail("queue with null name can not be created!");
+ }
+ catch (Exception e)
+ {
+ assertTrue(e instanceof IllegalArgumentException);
+ assertEquals("Queue name must not be null", e.getMessage());
+ }
+ }
+
+ /**
+ * Tests queue creation with queue name length less 255 characters but
+ * corresponding DLQ name length greater than 255.
+ */
+ public void testQueueNameWithLengthLessThan255ButDLQNameWithLengthGreaterThan255()
+ {
+ String queueName = "test-" + generateStringWithLength('a', 245);
+ try
+ {
+ // change DLQ name to make its length bigger than exchange name
+ ApplicationRegistry.getInstance().getConfiguration().getConfig()
+ .addProperty("deadLetterExchangeSuffix", "_DLE");
+ ApplicationRegistry.getInstance().getConfiguration().getConfig()
+ .addProperty("deadLetterQueueSuffix", "_DLQUEUE");
+
+ FieldTable fieldTable = new FieldTable();
+ fieldTable.setBoolean(AMQQueueFactory.X_QPID_DLQ_ENABLED, true);
+ AMQQueueFactory.createAMQQueueImpl(new AMQShortString(queueName), false, new AMQShortString("owner"),
+ false, false, _virtualHost, fieldTable);
+ fail("queue with DLQ name having more than 255 characters can not be created!");
+ }
+ catch (Exception e)
+ {
+ assertTrue("Unexpected exception is thrown!", e instanceof IllegalArgumentException);
+ assertTrue("Unexpected exception message!", e.getMessage().contains("DLQ queue name")
+ && e.getMessage().contains("length exceeds limit of 255"));
+ }
+ finally
+ {
+ ApplicationRegistry.getInstance().getConfiguration().getConfig()
+ .addProperty("deadLetterExchangeSuffix", DefaultExchangeFactory.DEFAULT_DLE_NAME_SUFFIX);
+ ApplicationRegistry.getInstance().getConfiguration().getConfig()
+ .addProperty("deadLetterQueueSuffix", AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX);
+ }
+ }
+
+ /**
+ * Tests queue creation with queue name length less 255 characters but
+ * corresponding DL exchange name length greater than 255.
+ */
+ public void testQueueNameWithLengthLessThan255ButDLExchangeNameWithLengthGreaterThan255()
+ {
+ String queueName = "test-" + generateStringWithLength('a', 245);
+ try
+ {
+ // change DLQ name to make its length bigger than exchange name
+ ApplicationRegistry.getInstance().getConfiguration().getConfig()
+ .addProperty("deadLetterExchangeSuffix", "_DLEXCHANGE");
+ ApplicationRegistry.getInstance().getConfiguration().getConfig()
+ .addProperty("deadLetterQueueSuffix", "_DLQ");
+
+ FieldTable fieldTable = new FieldTable();
+ fieldTable.setBoolean(AMQQueueFactory.X_QPID_DLQ_ENABLED, true);
+ AMQQueueFactory.createAMQQueueImpl(new AMQShortString(queueName), false, new AMQShortString("owner"),
+ false, false, _virtualHost, fieldTable);
+ fail("queue with DLE name having more than 255 characters can not be created!");
+ }
+ catch (Exception e)
+ {
+ assertTrue("Unexpected exception is thrown!", e instanceof IllegalArgumentException);
+ assertTrue("Unexpected exception message!", e.getMessage().contains("DL exchange name")
+ && e.getMessage().contains("length exceeds limit of 255"));
+ }
+ finally
+ {
+ ApplicationRegistry.getInstance().getConfiguration().getConfig()
+ .addProperty("deadLetterExchangeSuffix", DefaultExchangeFactory.DEFAULT_DLE_NAME_SUFFIX);
+ ApplicationRegistry.getInstance().getConfiguration().getConfig()
+ .addProperty("deadLetterQueueSuffix", AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX);
+ }
+ }
+
+ private String generateStringWithLength(char ch, int length)
+ {
+ StringBuilder sb = new StringBuilder();
+ for (int i = 0; i < length; i++)
+ {
+ sb.append(ch);
+ }
+ return sb.toString();
}
}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
index 8f3023f269..f70250132a 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
@@ -44,6 +44,7 @@ import javax.management.openmbean.CompositeData;
import javax.management.openmbean.CompositeDataSupport;
import javax.management.openmbean.TabularData;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
@@ -390,6 +391,34 @@ public class AMQQueueMBeanTest extends InternalBrokerBaseCase
assertFalse(channel.getBlocking());
}
+ public void testMaximumDeliveryCount() throws IOException
+ {
+ assertEquals("Unexpected default maximum delivery count", Integer.valueOf(0), _queueMBean.getMaximumDeliveryCount());
+ }
+
+ public void testViewAllMessages() throws Exception
+ {
+ final int messageCount = 5;
+ sendPersistentMessages(messageCount);
+
+
+ final TabularData messageTable = _queueMBean.viewMessages(1L, 5L);
+ assertNotNull("Message table should not be null", messageTable);
+ assertEquals("Unexpected number of rows", messageCount, messageTable.size());
+
+
+ final Iterator rowIterator = messageTable.values().iterator();
+ // Get its message ID
+ final CompositeDataSupport row1 = (CompositeDataSupport) rowIterator.next();
+ final Long msgId = (Long) row1.get("AMQ MessageId");
+ final Long queuePosition = (Long) row1.get("Queue Position");
+ final Integer deliveryCount = (Integer) row1.get("Delivery Count");
+
+ assertNotNull("Row should have value for queue position", queuePosition);
+ assertNotNull("Row should have value for msgid", msgId);
+ assertNotNull("Row should have value for deliveryCount", deliveryCount);
+ }
+
@Override
public void setUp() throws Exception
@@ -404,6 +433,13 @@ public class AMQQueueMBeanTest extends InternalBrokerBaseCase
ApplicationRegistry.remove();
}
+ private void sendPersistentMessages(int messageCount) throws AMQException
+ {
+ sendMessages(messageCount, true);
+ assertEquals("Expected " + messageCount + " messages in the queue", messageCount, _queueMBean
+ .getMessageCount().intValue());
+ }
+
private List<AMQMessage> sendMessages(int messageCount, boolean persistent) throws AMQException
{
return sendMessages(messageCount, persistent, 0l, 0l);
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java b/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
index 4c31092983..0daf79122c 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
@@ -611,4 +611,21 @@ public class MockAMQQueue implements AMQQueue
{
}
+
+ @Override
+ public int getMaximumDeliveryCount()
+ {
+ return 0;
+ }
+
+ @Override
+ public void setMaximumDeliveryCount(int maximumDeliveryCount)
+ {
+ }
+
+ @Override
+ public void setAlternateExchange(String exchangeName)
+ {
+ }
+
}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java b/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java
index 864b9ad368..7ad002c248 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java
@@ -244,4 +244,21 @@ public class MockQueueEntry implements QueueEntry
return null;
}
+ @Override
+ public int getDeliveryCount()
+ {
+ return 0;
+ }
+
+ @Override
+ public void incrementDeliveryCount()
+ {
+ }
+
+ @Override
+ public void decrementDeliveryCount()
+ {
+ }
+
+
}