diff options
Diffstat (limited to 'java/broker')
28 files changed, 1355 insertions, 113 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java b/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java index 76b6dad996..01a0d9900d 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java +++ b/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java @@ -20,8 +20,8 @@ package org.apache.qpid.server; import java.io.IOException; import java.util.ArrayList; -import java.util.Collections; import java.util.List; +import java.util.Map; import javax.management.JMException; import javax.management.MBeanException; @@ -30,6 +30,7 @@ import javax.management.ObjectName; import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.FieldTable; import org.apache.qpid.management.common.mbeans.ManagedBroker; import org.apache.qpid.management.common.mbeans.ManagedQueue; import org.apache.qpid.management.common.mbeans.annotations.MBeanConstructor; @@ -243,7 +244,13 @@ public class AMQBrokerManagerMBean extends AMQManagedObject implements ManagedBr */ public void createNewQueue(String queueName, String owner, boolean durable) throws JMException, MBeanException { - AMQQueue queue = _queueRegistry.getQueue(new AMQShortString(queueName)); + createNewQueue(queueName, owner, durable, null); + } + + public void createNewQueue(String queueName, String owner, boolean durable, Map<String,Object> arguments) throws JMException + { + final AMQShortString queueNameAsAMQShortString = new AMQShortString(queueName); + AMQQueue queue = _queueRegistry.getQueue(queueNameAsAMQShortString); if (queue != null) { throw new JMException("The queue \"" + queueName + "\" already exists."); @@ -258,11 +265,18 @@ public class AMQBrokerManagerMBean extends AMQManagedObject implements ManagedBr ownerShortString = new AMQShortString(owner); } + FieldTable args = null; + if(arguments != null) + { + args = FieldTable.convertToFieldTable(arguments); + } final VirtualHost virtualHost = getVirtualHost(); - queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString(queueName), durable, ownerShortString, false, false, virtualHost, null); + + queue = AMQQueueFactory.createAMQQueueImpl(queueNameAsAMQShortString, durable, ownerShortString, + false, false, getVirtualHost(), args); if (queue.isDurable() && !queue.isAutoDelete()) { - _durableConfig.createQueue(queue); + _durableConfig.createQueue(queue, args); } virtualHost.getBindingFactory().addBinding(queueName, queue, _defaultExchange, null); diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index 34bc57a826..a4fd997568 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -22,7 +22,6 @@ package org.apache.qpid.server; import org.apache.log4j.Logger; -import org.apache.qpid.AMQConnectionException; import org.apache.qpid.AMQException; import org.apache.qpid.AMQSecurityException; import org.apache.qpid.framing.AMQMethodBody; @@ -53,6 +52,7 @@ import org.apache.qpid.server.logging.messages.ChannelMessages; import org.apache.qpid.server.logging.messages.ExchangeMessages; import org.apache.qpid.server.logging.subjects.ChannelLogSubject; import org.apache.qpid.server.message.AMQMessage; +import org.apache.qpid.server.message.InboundMessage; import org.apache.qpid.server.message.MessageMetaData; import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.message.ServerMessage; @@ -63,6 +63,7 @@ import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.protocol.AMQConnectionModel; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.BaseQueue; +import org.apache.qpid.server.queue.InboundMessageAdapter; import org.apache.qpid.server.queue.IncomingMessage; import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.registry.ApplicationRegistry; @@ -692,6 +693,31 @@ public class AMQChannel implements SessionConfig, AMQSessionModel } + public boolean isMaxDeliveryCountEnabled(final long deliveryTag) + { + final QueueEntry queueEntry = _unacknowledgedMessageMap.get(deliveryTag); + if (queueEntry != null) + { + final int maximumDeliveryCount = queueEntry.getQueue().getMaximumDeliveryCount(); + return maximumDeliveryCount > 0; + } + + return false; + } + + public boolean isDeliveredTooManyTimes(final long deliveryTag) + { + final QueueEntry queueEntry = _unacknowledgedMessageMap.get(deliveryTag); + if (queueEntry != null) + { + final int maximumDeliveryCount = queueEntry.getQueue().getMaximumDeliveryCount(); + final int numDeliveries = queueEntry.getDeliveryCount(); + return maximumDeliveryCount != 0 && numDeliveries >= maximumDeliveryCount; + } + + return false; + } + /** * Called to resend all outstanding unacknowledged messages to this same channel. * @@ -739,9 +765,9 @@ public class AMQChannel implements SessionConfig, AMQSessionModel QueueEntry message = entry.getValue(); long deliveryTag = entry.getKey(); + //Amend the delivery counter as the client hasn't seen these messages yet. + message.decrementDeliveryCount(); - - ServerMessage msg = message.getMessage(); AMQQueue queue = message.getQueue(); // Our Java Client will always suspend the channel when resending! @@ -799,6 +825,10 @@ public class AMQChannel implements SessionConfig, AMQSessionModel { QueueEntry message = entry.getValue(); long deliveryTag = entry.getKey(); + + //Amend the delivery counter as the client hasn't seen these messages yet. + message.decrementDeliveryCount(); + _unacknowledgedMessageMap.remove(deliveryTag); message.setRedelivered(); @@ -1058,6 +1088,7 @@ public class AMQChannel implements SessionConfig, AMQSessionModel _session.registerMessageDelivered(entry.getMessage().getSize()); getProtocolSession().getProtocolOutputConverter().writeDeliver(entry, getChannelId(), deliveryTag, sub.getConsumerTag()); + entry.incrementDeliveryCount(); } }; @@ -1246,7 +1277,6 @@ public class AMQChannel implements SessionConfig, AMQSessionModel { private final Collection<QueueEntry> _ackedMessages; - public MessageAcknowledgeAction(Collection<QueueEntry> ackedMessages) { _ackedMessages = ackedMessages; @@ -1479,4 +1509,54 @@ public class AMQChannel implements SessionConfig, AMQSessionModel } } } + + public void deadLetter(long deliveryTag) throws AMQException + { + final UnacknowledgedMessageMap unackedMap = getUnacknowledgedMessageMap(); + final QueueEntry rejectedQueueEntry = unackedMap.get(deliveryTag); + + if (rejectedQueueEntry == null) + { + _logger.warn("No message found, unable to DLQ delivery tag: " + deliveryTag); + return; + } + else + { + final ServerMessage msg = rejectedQueueEntry.getMessage(); + + final AMQQueue queue = rejectedQueueEntry.getQueue(); + + final Exchange altExchange = queue.getAlternateExchange(); + unackedMap.remove(deliveryTag); + + if (altExchange == null) + { + _logger.debug("No alternate exchange configured for queue, must discard the message as unable to DLQ: delivery tag: " + deliveryTag); + _actor.message(_logSubject, ChannelMessages.DISCARDMSG_NOALTEXCH(msg.getMessageNumber(), queue.getName(), msg.getRoutingKey())); + rejectedQueueEntry.discard(); + return; + } + + final InboundMessage m = new InboundMessageAdapter(rejectedQueueEntry); + + final ArrayList<? extends BaseQueue> destinationQueues = altExchange.route(m); + + if (destinationQueues == null || destinationQueues.isEmpty()) + { + _logger.debug("Routing process provided no queues to enqueue the message on, must discard message as unable to DLQ: delivery tag: " + deliveryTag); + _actor.message(_logSubject, ChannelMessages.DISCARDMSG_NOROUTE(msg.getMessageNumber(), altExchange.getName())); + rejectedQueueEntry.discard(); + return; + } + + rejectedQueueEntry.routeToAlternate(); + + //output operational logging for each delivery post commit + for (final BaseQueue destinationQueue : destinationQueues) + { + _actor.message(_logSubject, ChannelMessages.DEADLETTERMSG(msg.getMessageNumber(), destinationQueue.getNameShortString().asString())); + } + + } + } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java b/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java index 31c683b548..b8437c8430 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java +++ b/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java @@ -63,7 +63,9 @@ public class QueueConfiguration extends ConfigurationPlugin "flowResumeCapacity", "lvq", "lvqKey", - "sortKey" + "sortKey", + "maximumDeliveryCount", + "deadLetterQueues" }; } @@ -173,6 +175,19 @@ public class QueueConfiguration extends ConfigurationPlugin return getStringValue("sortKey", null); } + public int getMaxDeliveryCount() + { + return getIntValue("maximumDeliveryCount", _vHostConfig.getMaxDeliveryCount()); + } + + /** + * Check if dead letter queue delivery is enabled, deferring to the virtualhost configuration if not set. + */ + public boolean isDeadLetterQueueEnabled() + { + return getBooleanValue("deadLetterQueues", _vHostConfig.isDeadLetterQueueEnabled()); + } + public static class QueueConfig extends ConfigurationPlugin { @Override diff --git a/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java b/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java index e13e2f4d8f..4b42e39aa1 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java +++ b/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java @@ -40,6 +40,8 @@ import org.apache.commons.configuration.SystemConfiguration; import org.apache.commons.configuration.XMLConfiguration; import org.apache.log4j.Logger; import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin; +import org.apache.qpid.server.exchange.DefaultExchangeFactory; +import org.apache.qpid.server.queue.AMQQueueFactory; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.signal.SignalHandlerTask; import org.apache.qpid.server.virtualhost.VirtualHost; @@ -810,4 +812,34 @@ public class ServerConfiguration extends ConfigurationPlugin { return getBooleanValue("management.managementRightsInferAllAccess", true); } + + public int getMaxDeliveryCount() + { + return getConfig().getInt("maximumDeliveryCount", 0); + } + + /** + * Check if dead letter queue delivery is enabled, defaults to disabled if not set. + */ + public boolean isDeadLetterQueueEnabled() + { + return getConfig().getBoolean("deadLetterQueues", false); + } + + /** + * String to affix to end of queue name when generating an alternate exchange for DLQ purposes. + */ + public String getDeadLetterExchangeSuffix() + { + return getConfig().getString("deadLetterExchangeSuffix", DefaultExchangeFactory.DEFAULT_DLE_NAME_SUFFIX); + } + + /** + * String to affix to end of queue name when generating a queue for DLQ purposes. + */ + public String getDeadLetterQueueSuffix() + { + return getConfig().getString("deadLetterQueueSuffix", AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX); + } + } diff --git a/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java b/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java index 6729a5ce0f..c4e4f701a8 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java +++ b/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java @@ -347,4 +347,18 @@ public class VirtualHostConfiguration extends ConfigurationPlugin { return getLongValue("transactionTimeout.idleClose", 0L); } + + public int getMaxDeliveryCount() + { + return getIntValue("queues.maximumDeliveryCount", ApplicationRegistry.getInstance().getConfiguration().getMaxDeliveryCount()); + } + + /** + * Check if dead letter queue delivery is enabled, deferring to the broker configuration if not set. + */ + public boolean isDeadLetterQueueEnabled() + { + return getBooleanValue("queues.deadLetterQueues", ApplicationRegistry.getInstance().getConfiguration().isDeadLetterQueueEnabled()); + } + } diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java index 7837a9bc38..102cd42ac3 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java @@ -40,6 +40,7 @@ import org.apache.qpid.server.virtualhost.VirtualHost; public class DefaultExchangeFactory implements ExchangeFactory { private static final Logger _logger = Logger.getLogger(DefaultExchangeFactory.class); + public static final String DEFAULT_DLE_NAME_SUFFIX = "_DLE"; private Map<AMQShortString, ExchangeType<? extends Exchange>> _exchangeClassMap = new HashMap<AMQShortString, ExchangeType<? extends Exchange>>(); private final VirtualHost _host; diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java index bd75f7bc51..76f86ea1b4 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java @@ -117,7 +117,7 @@ public class FanoutExchange extends AbstractExchange public boolean isBound(AMQShortString routingKey, AMQQueue queue) { - return _queues.contains(queue); + return _queues.containsKey(queue); } public boolean isBound(AMQShortString routingKey) @@ -129,7 +129,7 @@ public class FanoutExchange extends AbstractExchange public boolean isBound(AMQQueue queue) { - return _queues.contains(queue); + return _queues.containsKey(queue); } public boolean hasBindings() diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java index ef9711004d..bbb009003c 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java @@ -127,8 +127,6 @@ public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetB final ClientDeliveryMethod getDeliveryMethod = new ClientDeliveryMethod() { - int _msg; - public void deliverToClient(final Subscription sub, final QueueEntry entry, final long deliveryTag) throws AMQException { @@ -137,6 +135,7 @@ public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetB { session.getProtocolOutputConverter().writeGetOk(entry, channel.getChannelId(), deliveryTag, queue.getMessageCount()); + entry.incrementDeliveryCount(); } else { diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java index 62dd76f832..0ea88e4ab6 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java @@ -59,7 +59,6 @@ public class BasicRejectMethodHandler implements StateAwareMethodListener<BasicR { _logger.debug("Rejecting:" + body.getDeliveryTag() + ": Requeue:" + body.getRequeue() + - //": Resend:" + evt.getMethod().resend + " on channel:" + channel.debugIdentity()); } @@ -70,26 +69,23 @@ public class BasicRejectMethodHandler implements StateAwareMethodListener<BasicR if (message == null) { _logger.warn("Dropping reject request as message is null for tag:" + deliveryTag); -// throw evt.getMethod().getChannelException(AMQConstant.NOT_FOUND, "Delivery Tag(" + deliveryTag + ")not known"); } else { if (message.isQueueDeleted()) { - _logger.warn("Message's Queue as already been purged, unable to Reject. " + - "Dropping message should use Dead Letter Queue"); + _logger.warn("Message's Queue has already been purged, dropping message"); message = channel.getUnacknowledgedMessageMap().remove(deliveryTag); if(message != null) { message.discard(); } - //sendtoDeadLetterQueue(msg) return; } if (message.getMessage() == null) { - _logger.warn("Message as already been purged, unable to Reject."); + _logger.warn("Message has already been purged, unable to Reject."); return; } @@ -98,27 +94,44 @@ public class BasicRejectMethodHandler implements StateAwareMethodListener<BasicR { _logger.debug("Rejecting: DT:" + deliveryTag + "-" + message.getMessage() + ": Requeue:" + body.getRequeue() + - //": Resend:" + evt.getMethod().resend + " on channel:" + channel.debugIdentity()); } - // If we haven't requested message to be resent to this consumer then reject it from ever getting it. - //if (!evt.getMethod().resend) - { - message.reject(); - } + message.reject(); if (body.getRequeue()) { channel.requeue(deliveryTag); + + //this requeue represents a message rejected from the pre-dispatch queue + //therefore we need to amend the delivery counter. + message.decrementDeliveryCount(); } else { - _logger.warn("Dropping message as requeue not required and there is no dead letter queue"); - message = channel.getUnacknowledgedMessageMap().remove(deliveryTag); - //sendtoDeadLetterQueue(AMQMessage message) -// message.queue = channel.getDefaultDeadLetterQueue(); -// channel.requeue(deliveryTag); + final boolean maxDeliveryCountEnabled = channel.isMaxDeliveryCountEnabled(deliveryTag); + _logger.debug("maxDeliveryCountEnabled: " + maxDeliveryCountEnabled + " deliveryTag " + deliveryTag); + if (maxDeliveryCountEnabled) + { + final boolean deliveredTooManyTimes = channel.isDeliveredTooManyTimes(deliveryTag); + _logger.debug("deliveredTooManyTimes: " + deliveredTooManyTimes + " deliveryTag " + deliveryTag); + if (deliveredTooManyTimes) + { + channel.deadLetter(body.getDeliveryTag()); + } + else + { + //this requeue represents a message rejected because of a recover/rollback that we + //are not ready to DLQ. We rely on the reject command to resend from the unacked map + //and therefore need to increment the delivery counter so we cancel out the effect + //of the AMQChannel#resend() decrement. + message.incrementDeliveryCount(); + } + } + else + { + channel.deadLetter(body.getDeliveryTag()); + } } } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java index 4643dee0a3..20ba3af458 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java @@ -72,7 +72,12 @@ public class TxRollbackHandler implements StateAwareMethodListener<TxRollbackBod }; channel.rollback(task); - + + //Now resend all the unacknowledged messages back to the original subscribers. + //(Must be done after the TxnRollback-ok response). + // Why, are we not allowed to send messages back to client before the ok method? + channel.resend(false); + } catch (AMQException e) { diff --git a/java/broker/src/main/java/org/apache/qpid/server/logging/messages/Channel_logmessages.properties b/java/broker/src/main/java/org/apache/qpid/server/logging/messages/Channel_logmessages.properties index ed8c0d0ce9..b5df212904 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/logging/messages/Channel_logmessages.properties +++ b/java/broker/src/main/java/org/apache/qpid/server/logging/messages/Channel_logmessages.properties @@ -32,3 +32,7 @@ FLOW_REMOVED = CHN-1006 : Flow Control Removed # 0 - time in milliseconds OPEN_TXN = CHN-1007 : Open Transaction : {0,number} ms IDLE_TXN = CHN-1008 : Idle Transaction : {0,number} ms + +DISCARDMSG_NOALTEXCH = CHN-1009 : Discarded message : {0,number} as no alternate exchange configured for queue : {1} routing key : {2} +DISCARDMSG_NOROUTE = CHN-1010 : Discarded message : {0,number} as no binding on alternate exchange : {1} +DEADLETTERMSG = CHN-1011 : Message : {0,number} moved to dead letter queue : {1} diff --git a/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessage.java b/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessage.java index db32f13d8d..32bf8aa17d 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessage.java +++ b/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessage.java @@ -22,6 +22,7 @@ package org.apache.qpid.server.message; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.server.AMQChannel; @@ -65,7 +66,6 @@ public class AMQMessage extends AbstractServerMessageImpl WeakReference<AMQChannel> _channelRef; - public AMQMessage(StoredMessage<MessageMetaData> handle) { this(handle, null); @@ -122,7 +122,15 @@ public class AMQMessage extends AbstractServerMessageImpl public String getRoutingKey() { - // TODO + MessageMetaData messageMetaData = getMessageMetaData(); + if (messageMetaData != null) + { + AMQShortString routingKey = messageMetaData.getMessagePublishInfo().getRoutingKey(); + if (routingKey != null) + { + return routingKey.asString(); + } + } return null; } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java index 9140a13625..6dfdc5e8b4 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java @@ -213,6 +213,8 @@ public interface AMQQueue extends Managable, Comparable<AMQQueue>, ExchangeRefer void setAlternateExchange(Exchange exchange); + void setAlternateExchange(String exchangeName); + Map<String, Object> getArguments(); void checkCapacity(AMQChannel channel); @@ -272,4 +274,22 @@ public interface AMQQueue extends Managable, Comparable<AMQQueue>, ExchangeRefer ManagedObject getManagedObject(); void setExclusive(boolean exclusive) throws AMQException; + + /** + * Gets the maximum delivery count. If a message on this queue + * is delivered more than maximumDeliveryCount, the message will be + * routed to the {@link #getAlternateExchange()} (if set), or otherwise + * discarded. 0 indicates that maximum deliver count should not be enforced. + * + * @return maximum delivery count + */ + int getMaximumDeliveryCount(); + + /** + * Sets the maximum delivery count. + * + * @param maximumDeliveryCount maximum delivery count + */ + public void setMaximumDeliveryCount(final int maximumDeliveryCount); + } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java index e4a6f01930..1b15bafb49 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java @@ -24,9 +24,15 @@ import java.util.HashMap; import java.util.Map; import org.apache.qpid.AMQException; import org.apache.qpid.AMQSecurityException; +import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.configuration.QueueConfiguration; +import org.apache.qpid.server.configuration.ServerConfiguration; +import org.apache.qpid.server.exchange.Exchange; +import org.apache.qpid.server.exchange.ExchangeFactory; +import org.apache.qpid.server.exchange.ExchangeRegistry; +import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.virtualhost.VirtualHost; public class AMQQueueFactory @@ -37,6 +43,11 @@ public class AMQQueueFactory public static final String QPID_LAST_VALUE_QUEUE_KEY = "qpid.last_value_queue_key"; public static final String QPID_QUEUE_SORT_KEY = "qpid.queue_sort_key"; + public static final String DLQ_ROUTING_KEY = "dlq"; + public static final String X_QPID_DLQ_ENABLED = "x-qpid-dlq-enabled"; + public static final String X_QPID_MAXIMUM_DELIVERY_COUNT = "x-qpid-maximum-delivery-count"; + public static final String DEFAULT_DLQ_NAME_SUFFIX = "_DLQ"; + private abstract static class QueueProperty { @@ -80,6 +91,24 @@ public class AMQQueueFactory } + private abstract static class QueueIntegerProperty extends QueueProperty + { + public QueueIntegerProperty(String argumentName) + { + super(argumentName); + } + + public void setPropertyValue(AMQQueue queue, Object value) + { + if(value instanceof Number) + { + setPropertyValue(queue, ((Number)value).intValue()); + } + + } + abstract void setPropertyValue(AMQQueue queue, int value); + } + private static final QueueProperty[] DECLAREABLE_PROPERTIES = { new QueueLongProperty("x-qpid-maximum-message-age") { @@ -122,8 +151,14 @@ public class AMQQueueFactory { queue.setFlowResumeCapacity(value); } + }, + new QueueIntegerProperty(X_QPID_MAXIMUM_DELIVERY_COUNT) + { + public void setPropertyValue(AMQQueue queue, int value) + { + queue.setMaximumDeliveryCount(value); + } } - }; @@ -149,8 +184,13 @@ public class AMQQueueFactory String owner, boolean autoDelete, boolean exclusive, - VirtualHost virtualHost, Map<String, Object> arguments) throws AMQSecurityException + VirtualHost virtualHost, Map<String, Object> arguments) throws AMQSecurityException, AMQException { + if (queueName == null) + { + throw new IllegalArgumentException("Queue name must not be null"); + } + // Access check if (!virtualHost.getSecurityManager().authoriseCreateQueue(autoDelete, durable, exclusive, null, null, new AMQShortString(queueName), owner)) { @@ -158,6 +198,13 @@ public class AMQQueueFactory throw new AMQSecurityException(description); } + QueueConfiguration queueConfiguration = virtualHost.getConfiguration().getQueueConfiguration(queueName); + boolean isDLQEnabled = isDLQEnabled(autoDelete, arguments, queueConfiguration); + if (isDLQEnabled) + { + validateDLNames(queueName); + } + int priorities = 1; String conflationKey = null; String sortingKey = null; @@ -219,10 +266,63 @@ public class AMQQueueFactory } } - return q; + if(isDLQEnabled) + { + final String dlExchangeName = getDeadLetterExchangeName(queueName); + final String dlQueueName = getDeadLetterQueueName(queueName); - } + final ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry(); + final ExchangeFactory exchangeFactory = virtualHost.getExchangeFactory(); + final QueueRegistry queueRegistry = virtualHost.getQueueRegistry(); + + Exchange dlExchange = null; + synchronized(exchangeRegistry) + { + dlExchange = exchangeRegistry.getExchange(dlExchangeName); + + if(dlExchange == null) + { + dlExchange = exchangeFactory.createExchange(new AMQShortString(dlExchangeName), ExchangeDefaults.FANOUT_EXCHANGE_CLASS, true, false, 0); + + exchangeRegistry.registerExchange(dlExchange); + + //enter the dle in the persistent store + virtualHost.getDurableConfigurationStore().createExchange(dlExchange); + } + } + + AMQQueue dlQueue = null; + + synchronized(queueRegistry) + { + dlQueue = queueRegistry.getQueue(dlQueueName); + + if(dlQueue == null) + { + //set args to disable DLQ'ing/MDC from the DLQ itself, preventing loops etc + final Map<String, Object> args = new HashMap<String, Object>(); + args.put(X_QPID_DLQ_ENABLED, false); + args.put(X_QPID_MAXIMUM_DELIVERY_COUNT, 0); + + dlQueue = createAMQQueueImpl(dlQueueName, true, owner, false, exclusive, virtualHost, args); + + //enter the dlq in the persistent store + virtualHost.getDurableConfigurationStore().createQueue(dlQueue, FieldTable.convertToFieldTable(args)); + } + } + + //ensure the queue is bound to the exchange + if(!dlExchange.isBound(DLQ_ROUTING_KEY, dlQueue)) + { + //actual routing key used does not matter due to use of fanout exchange, + //but we will make the key 'dlq' as it can be logged at creation. + virtualHost.getBindingFactory().addBinding(DLQ_ROUTING_KEY, dlQueue, dlExchange, null); + } + q.setAlternateExchange(dlExchange); + } + return q; + } public static AMQQueue createAMQQueueImpl(QueueConfiguration config, VirtualHost host) throws AMQException { @@ -250,10 +350,108 @@ public class AMQQueueFactory arguments = new HashMap<String,Object>(); arguments.put(QPID_QUEUE_SORT_KEY, config.getQueueSortKey()); } + if (!config.getAutoDelete() && config.isDeadLetterQueueEnabled()) + { + if (arguments == null) + { + arguments = new HashMap<String,Object>(); + } + arguments.put(X_QPID_DLQ_ENABLED, true); + } AMQQueue q = createAMQQueueImpl(queueName, durable, owner, autodelete, exclusive, host, arguments); q.configure(config); return q; } + + /** + * Validates DLQ and DLE names + * <p> + * DLQ name and DLQ exchange name need to be validated in order to keep + * integrity in cases when queue name passes validation check but DLQ name + * or DL exchange name fails to pass it. Otherwise, we might have situations + * when queue is created but DL exchange or/and DLQ creation fail. + * <p> + * + * @param name + * queue name + * @throws IllegalArgumentException + * thrown if length of queue name or exchange name exceed 255 + */ + protected static void validateDLNames(String name) + { + // check if DLQ name and DLQ exchange name do not exceed 255 + String exchangeName = getDeadLetterExchangeName(name); + if (exchangeName.length() > AMQShortString.MAX_LENGTH) + { + throw new IllegalArgumentException("DL exchange name '" + exchangeName + + "' length exceeds limit of " + AMQShortString.MAX_LENGTH + " characters for queue " + name); + } + String queueName = getDeadLetterQueueName(name); + if (queueName.length() > AMQShortString.MAX_LENGTH) + { + throw new IllegalArgumentException("DLQ queue name '" + queueName + "' length exceeds limit of " + + AMQShortString.MAX_LENGTH + " characters for queue " + name); + } + } + + /** + * Checks if DLQ is enabled for the queue. + * + * @param autoDelete + * queue auto-delete flag + * @param arguments + * queue arguments + * @param qConfig + * queue configuration + * @return true if DLQ enabled + */ + protected static boolean isDLQEnabled(boolean autoDelete, Map<String, Object> arguments, QueueConfiguration qConfig) + { + //feature is not to be enabled for temporary queues or when explicitly disabled by argument + if (!autoDelete) + { + boolean dlqArgumentPresent = arguments != null && arguments.containsKey(X_QPID_DLQ_ENABLED); + if (dlqArgumentPresent || qConfig.isDeadLetterQueueEnabled()) + { + boolean dlqEnabled = true; + if (dlqArgumentPresent) + { + Object argument = arguments.get(X_QPID_DLQ_ENABLED); + dlqEnabled = argument instanceof Boolean && ((Boolean)argument).booleanValue(); + } + return dlqEnabled; + } + } + return false; + } + + /** + * Generates a dead letter queue name for a given queue name + * + * @param name + * queue name + * @return DLQ name + */ + protected static String getDeadLetterQueueName(String name) + { + ServerConfiguration serverConfig = ApplicationRegistry.getInstance().getConfiguration(); + String dlQueueName = name + serverConfig.getDeadLetterQueueSuffix(); + return dlQueueName; + } + + /** + * Generates a dead letter exchange name for a given queue name + * + * @param name + * queue name + * @return DL exchange name + */ + protected static String getDeadLetterExchangeName(String name) + { + ServerConfiguration serverConfig = ApplicationRegistry.getInstance().getConfiguration(); + String dlExchangeName = name + serverConfig.getDeadLetterExchangeSuffix(); + return dlExchangeName; + } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java index d58d95c801..b4765d6227 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java @@ -28,6 +28,7 @@ import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.management.common.mbeans.ManagedQueue; import org.apache.qpid.management.common.mbeans.annotations.MBeanConstructor; import org.apache.qpid.management.common.mbeans.annotations.MBeanDescription; +import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.management.AMQManagedObject; import org.apache.qpid.server.management.ManagedObject; import org.apache.qpid.server.message.ServerMessage; @@ -80,7 +81,7 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que private final String _queueName; // OpenMBean data types for viewMessages method - private static OpenType[] _msgAttributeTypes = new OpenType[5]; // AMQ message attribute types. + private static OpenType[] _msgAttributeTypes = new OpenType[6]; // AMQ message attribute types. private static CompositeType _messageDataType = null; // Composite type for representing AMQ Message data. private static TabularType _messagelistDataType = null; // Datatype for representing AMQ messages list. @@ -139,6 +140,7 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que _msgAttributeTypes[2] = SimpleType.LONG; // For size _msgAttributeTypes[3] = SimpleType.BOOLEAN; // For redelivered _msgAttributeTypes[4] = SimpleType.LONG; // For queue position + _msgAttributeTypes[5] = SimpleType.INTEGER; // For delivery count _messageDataType = new CompositeType("Message", "AMQ Message", VIEW_MSGS_COMPOSITE_ITEM_NAMES_DESC.toArray(new String[VIEW_MSGS_COMPOSITE_ITEM_NAMES_DESC.size()]), @@ -177,6 +179,11 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que return _queue.getMessageCount(); } + public Integer getMaximumDeliveryCount() + { + return _queue.getMaximumDeliveryCount(); + } + public Long getMaximumMessageSize() { return _queue.getMaximumMessageSize(); @@ -295,6 +302,18 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que } } + public void setAlternateExchange(String exchangeName) + { + _queue.setAlternateExchange(exchangeName); + } + + public String getAlternateExchange() + { + Exchange exchange = _queue.getAlternateExchange(); + String name = exchange == null ? null : exchange.getName(); + return name == null ? null : name; + } + /** * Checks if there is any notification to be send to the listeners */ @@ -472,7 +491,7 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que ContentHeaderBody headerBody = msg.getContentHeaderBody(); // Create header attributes list headerAttributes = getMessageHeaderProperties(headerBody); - itemValues = new Object[]{msg.getMessageId(), headerAttributes, headerBody.bodySize, queueEntry.isRedelivered(), position}; + itemValues = new Object[]{msg.getMessageId(), headerAttributes, headerBody.bodySize, queueEntry.isRedelivered(), position, queueEntry.getDeliveryCount()}; } else if(serverMsg instanceof MessageTransferMessage) { @@ -481,13 +500,13 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que // Create header attributes list headerAttributes = getMessageTransferMessageHeaderProps(msg); - itemValues = new Object[]{msg.getMessageNumber(), headerAttributes, msg.getSize(), queueEntry.isRedelivered(), position}; + itemValues = new Object[]{msg.getMessageNumber(), headerAttributes, msg.getSize(), queueEntry.isRedelivered(), position, queueEntry.getDeliveryCount()}; } else { //unknown message headerAttributes = new String[]{"N/A"}; - itemValues = new Object[]{serverMsg.getMessageNumber(), headerAttributes, serverMsg.getSize(), queueEntry.isRedelivered(), position}; + itemValues = new Object[]{serverMsg.getMessageNumber(), headerAttributes, serverMsg.getSize(), queueEntry.isRedelivered(), position, queueEntry.getDeliveryCount()}; } CompositeData messageData = new CompositeDataSupport(_messageDataType, diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/InboundMessageAdapter.java b/java/broker/src/main/java/org/apache/qpid/server/queue/InboundMessageAdapter.java index 77da08d8c4..26112d9f53 100755 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/InboundMessageAdapter.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/InboundMessageAdapter.java @@ -24,7 +24,7 @@ package org.apache.qpid.server.queue; import org.apache.qpid.server.message.InboundMessage; import org.apache.qpid.server.message.AMQMessageHeader; -class InboundMessageAdapter implements InboundMessage +public class InboundMessageAdapter implements InboundMessage { private QueueEntry _entry; @@ -33,7 +33,7 @@ class InboundMessageAdapter implements InboundMessage { } - InboundMessageAdapter(QueueEntry entry) + public InboundMessageAdapter(QueueEntry entry) { _entry = entry; } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java index c1fb0258fa..37fad54c07 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java @@ -1,5 +1,7 @@ package org.apache.qpid.server.queue; +import java.util.Collection; + import org.apache.qpid.AMQException; import org.apache.qpid.server.subscription.Subscription; import org.apache.qpid.server.message.ServerMessage; @@ -234,4 +236,16 @@ public interface QueueEntry extends Comparable<QueueEntry>, Filterable * @return true if entry is either DEQUED or DELETED state */ boolean isDispensed(); + + /** + * Number of times this queue entry has been delivered. + * + * @return delivery count + */ + int getDeliveryCount(); + + void incrementDeliveryCount(); + + void decrementDeliveryCount(); + } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java index ee1d214c1f..5bb5dc3462 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java @@ -31,10 +31,13 @@ import org.apache.qpid.server.subscription.Subscription; import org.apache.qpid.server.txn.AutoCommitTransaction; import org.apache.qpid.server.txn.ServerTransaction; +import java.util.Collection; +import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.AtomicLongFieldUpdater; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; @@ -80,6 +83,12 @@ public abstract class QueueEntryImpl implements QueueEntry private volatile int _deliveryState; + /** Number of times this message has been delivered */ + private volatile int _deliveryCount = 0; + private static final AtomicIntegerFieldUpdater<QueueEntryImpl> _deliveryCountUpdater = AtomicIntegerFieldUpdater + .newUpdater(QueueEntryImpl.class, "_deliveryCount"); + + public QueueEntryImpl(QueueEntryList<?> queueEntryList) { @@ -406,50 +415,51 @@ public abstract class QueueEntryImpl implements QueueEntry public void routeToAlternate() { final AMQQueue currentQueue = getQueue(); - Exchange alternateExchange = currentQueue.getAlternateExchange(); + Exchange alternateExchange = currentQueue.getAlternateExchange(); - if(alternateExchange != null) + if (alternateExchange != null) + { + final List<? extends BaseQueue> rerouteQueues = alternateExchange.route(new InboundMessageAdapter(this)); + final ServerMessage message = getMessage(); + if (rerouteQueues != null && rerouteQueues.size() != 0) { - final List<? extends BaseQueue> rerouteQueues = alternateExchange.route(new InboundMessageAdapter(this)); - final ServerMessage message = getMessage(); - if(rerouteQueues != null && rerouteQueues.size() != 0) - { - ServerTransaction txn = new AutoCommitTransaction(getQueue().getVirtualHost().getTransactionLog()); - txn.enqueue(rerouteQueues, message, new ServerTransaction.Action() { - public void postCommit() + ServerTransaction txn = new AutoCommitTransaction(getQueue().getVirtualHost().getTransactionLog()); + + txn.enqueue(rerouteQueues, message, new ServerTransaction.Action() + { + public void postCommit() + { + try { - try + for (BaseQueue queue : rerouteQueues) { - for(BaseQueue queue : rerouteQueues) - { - queue.enqueue(message); - } - } - catch (AMQException e) - { - throw new RuntimeException(e); + queue.enqueue(message); } } - - public void onRollback() + catch (AMQException e) { - + throw new RuntimeException(e); } - }); - txn.dequeue(currentQueue,message, - new ServerTransaction.Action() - { - public void postCommit() - { - discard(); - } - - public void onRollback() - { - - } - }); + } + + public void onRollback() + { + + } + }); + txn.dequeue(currentQueue, message, new ServerTransaction.Action() + { + public void postCommit() + { + discard(); + } + + public void onRollback() + { + + } + }); } } } @@ -524,4 +534,19 @@ public abstract class QueueEntryImpl implements QueueEntry return _state.isDispensed(); } + public int getDeliveryCount() + { + return _deliveryCount; + } + + public void incrementDeliveryCount() + { + _deliveryCountUpdater.incrementAndGet(this); + } + + public void decrementDeliveryCount() + { + _deliveryCountUpdater.decrementAndGet(this); + } + } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java index ab47d89e01..7717c8ebfc 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java @@ -187,7 +187,8 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener private long _createTime = System.currentTimeMillis(); private ConfigurationPlugin _queueConfiguration; - + /** the maximum delivery count for each message on this queue or 0 if maximum delivery count is not to be enforced. */ + private int _maximumDeliveryCount = ApplicationRegistry.getInstance().getConfiguration().getMaxDeliveryCount(); protected SimpleAMQQueue(AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, boolean exclusive, VirtualHost virtualHost, Map<String,Object> arguments) { @@ -356,6 +357,22 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener _alternateExchange = exchange; } + public void setAlternateExchange(String exchangeName) + { + if(exchangeName == null || exchangeName.equals("")) + { + _alternateExchange = null; + return; + } + + Exchange exchange = getVirtualHost().getExchangeRegistry().getExchange(new AMQShortString(exchangeName)); + if (exchange == null) + { + throw new RuntimeException("Exchange '" + exchangeName + "' is not registered with the VirtualHost."); + } + setAlternateExchange(exchange); + } + public Map<String, Object> getArguments() { return _arguments; @@ -521,13 +538,12 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener //Reconfigure the queue for to reflect this new binding. ConfigurationPlugin config = getVirtualHost().getConfiguration().getQueueConfiguration(this); - if (_logger.isDebugEnabled()) - { - _logger.debug("Reconfiguring queue(" + this + ") with config:" + config + " was "+ _queueConfiguration); - } - if (config != null) { + if (_logger.isDebugEnabled()) + { + _logger.debug("Reconfiguring queue(" + this + ") with config:" + config + " was "+ _queueConfiguration); + } // Reconfigure with new config. configure(config); } @@ -2108,6 +2124,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener setMaximumMessageSize(((QueueConfiguration)config).getMaximumMessageSize()); setMaximumMessageCount(((QueueConfiguration)config).getMaximumMessageCount()); setMinimumAlertRepeatGap(((QueueConfiguration)config).getMinimumAlertRepeatGap()); + setMaximumDeliveryCount(((QueueConfiguration)config).getMaxDeliveryCount()); _capacity = ((QueueConfiguration)config).getCapacity(); _flowResumeCapacity = ((QueueConfiguration)config).getFlowResumeCapacity(); } @@ -2229,4 +2246,15 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener { return _logActor; } + + public int getMaximumDeliveryCount() + { + return _maximumDeliveryCount; + } + + public void setMaximumDeliveryCount(final int maximumDeliveryCount) + { + _maximumDeliveryCount = maximumDeliveryCount; + } + } diff --git a/java/broker/src/test/java/org/apache/qpid/server/AMQBrokerManagerMBeanTest.java b/java/broker/src/test/java/org/apache/qpid/server/AMQBrokerManagerMBeanTest.java index fdd533b704..7d128f2bc5 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/AMQBrokerManagerMBeanTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/AMQBrokerManagerMBeanTest.java @@ -20,18 +20,31 @@ */ package org.apache.qpid.server; +import java.util.HashMap; +import java.util.Map; + +import org.apache.commons.configuration.XMLConfiguration; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.management.common.mbeans.ManagedBroker; +import org.apache.qpid.server.configuration.ServerConfiguration; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.exchange.ExchangeRegistry; +import org.apache.qpid.server.logging.SystemOutMessageLogger; +import org.apache.qpid.server.logging.actors.CurrentActor; +import org.apache.qpid.server.logging.actors.TestLogActor; +import org.apache.qpid.server.queue.AMQPriorityQueue; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.AMQQueueFactory; import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.registry.IApplicationRegistry; -import org.apache.qpid.server.util.InternalBrokerBaseCase; +import org.apache.qpid.server.store.TestableMemoryMessageStore; +import org.apache.qpid.server.util.TestApplicationRegistry; import org.apache.qpid.server.virtualhost.VirtualHostImpl; import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.test.utils.QpidTestCase; -public class AMQBrokerManagerMBeanTest extends InternalBrokerBaseCase +public class AMQBrokerManagerMBeanTest extends QpidTestCase { private QueueRegistry _queueRegistry; private ExchangeRegistry _exchangeRegistry; @@ -95,14 +108,86 @@ public class AMQBrokerManagerMBeanTest extends InternalBrokerBaseCase assertTrue("New queue should be bound to default exchange", defaultExchange.isBound(new AMQShortString(queueName))); } + /** + * Tests that setting the {@link AMQQueueFactory#X_QPID_MAXIMUM_DELIVERY_COUNT} argument does cause the + * maximum delivery count to be set on the Queue. + */ + public void testCreateNewQueueWithMaximumDeliveryCount() throws Exception + { + final Map<String,Object> args = new HashMap<String, Object>(); + args.put(AMQQueueFactory.X_QPID_MAXIMUM_DELIVERY_COUNT, 5); + + final AMQShortString queueName = new AMQShortString("testCreateNewQueueWithMaximumDeliveryCount"); + + final QueueRegistry qReg = _vHost.getQueueRegistry(); + + assertNull("The queue should not yet exist", qReg.getQueue(queueName)); + + final ManagedBroker mbean = new AMQBrokerManagerMBean((VirtualHostImpl.VirtualHostMBean) _vHost.getManagedObject()); + mbean.createNewQueue(queueName.asString(), "test", false, args); + + final AMQQueue createdQueue = qReg.getQueue(queueName); + assertNotNull("The queue was not registered as expected", createdQueue); + assertEquals("Unexpected maximum delivery count", 5, createdQueue.getMaximumDeliveryCount()); + } + + /** + * Tests that setting the {@link AMQQueueFactory#X_QPID_PRIORITIES} argument prompts creation of + * a Priority Queue. + */ + public void testCreatePriorityQueue() throws Exception + { + int numPriorities = 7; + Map<String,Object> args = new HashMap<String, Object>(); + args.put(AMQQueueFactory.X_QPID_PRIORITIES, numPriorities); + + AMQShortString queueName = new AMQShortString("testCreatePriorityQueue"); + + QueueRegistry qReg = _vHost.getQueueRegistry(); + + assertNull("The queue should not yet exist", qReg.getQueue(queueName)); + + ManagedBroker mbean = new AMQBrokerManagerMBean((VirtualHostImpl.VirtualHostMBean) _vHost.getManagedObject()); + mbean.createNewQueue(queueName.asString(), "test", false, args); + + AMQQueue queue = qReg.getQueue(queueName); + assertEquals("Queue is not a priorty queue", AMQPriorityQueue.class, queue.getClass()); + assertEquals("Number of priorities supported was not as expected", numPriorities, ((AMQPriorityQueue)queue).getPriorities()); + } + @Override public void setUp() throws Exception { super.setUp(); + + CurrentActor.set(new TestLogActor(new SystemOutMessageLogger())); + + XMLConfiguration configXml = new XMLConfiguration(); + configXml.addProperty("virtualhosts.virtualhost(-1).name", "test"); + configXml.addProperty("virtualhosts.virtualhost(-1).test.store.class", TestableMemoryMessageStore.class.getName()); + + ServerConfiguration configuration = new ServerConfiguration(configXml); + + ApplicationRegistry registry = new TestApplicationRegistry(configuration); + ApplicationRegistry.initialise(registry); + registry.getVirtualHostRegistry().setDefaultVirtualHostName("test"); + IApplicationRegistry appRegistry = ApplicationRegistry.getInstance(); _vHost = appRegistry.getVirtualHostRegistry().getVirtualHost("test"); _queueRegistry = _vHost.getQueueRegistry(); _exchangeRegistry = _vHost.getExchangeRegistry(); } + @Override + public void tearDown() throws Exception + { + try + { + super.tearDown(); + } + finally + { + ApplicationRegistry.remove(); + } + } } diff --git a/java/broker/src/test/java/org/apache/qpid/server/configuration/QueueConfigurationTest.java b/java/broker/src/test/java/org/apache/qpid/server/configuration/QueueConfigurationTest.java index 9941c00499..e1a5e7d338 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/configuration/QueueConfigurationTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/configuration/QueueConfigurationTest.java @@ -24,6 +24,8 @@ import junit.framework.TestCase; import org.apache.commons.configuration.CompositeConfiguration; import org.apache.commons.configuration.ConfigurationException; import org.apache.commons.configuration.PropertiesConfiguration; +import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.server.util.TestApplicationRegistry; public class QueueConfigurationTest extends TestCase { @@ -43,11 +45,71 @@ public class QueueConfigurationTest extends TestCase fullEnv.setProperty("queues.maximumMessageSize", 1); fullEnv.setProperty("queues.maximumMessageCount", 1); fullEnv.setProperty("queues.minimumAlertRepeatGap", 1); + fullEnv.setProperty("queues.deadLetterQueues", true); + fullEnv.setProperty("queues.maximumDeliveryCount", 5); _fullHostConf = new VirtualHostConfiguration("test", fullEnv); } + public void testMaxDeliveryCount() throws Exception + { + try + { + ApplicationRegistry registry = new TestApplicationRegistry(new ServerConfiguration(_env)); + ApplicationRegistry.initialise(registry); + + // Check default value + QueueConfiguration qConf = new QueueConfiguration("test", _emptyConf); + assertEquals("Unexpected default server configuration for max delivery count ", 0, qConf.getMaxDeliveryCount()); + + // Check explicit value + VirtualHostConfiguration vhostConfig = overrideConfiguration("maximumDeliveryCount", 7); + qConf = new QueueConfiguration("test", vhostConfig); + assertEquals("Unexpected host configuration for max delivery count", 7, qConf.getMaxDeliveryCount()); + + // Check inherited value + qConf = new QueueConfiguration("test", _fullHostConf); + assertEquals("Unexpected queue configuration for max delivery count", 5, qConf.getMaxDeliveryCount()); + + } + finally + { + ApplicationRegistry.remove(); + } + } + + /** + * Tests that the default setting for DLQ configuration is disabled, and verifies that it can be overridden + * at a broker or virtualhost level. + * @throws Exception + */ + public void testIsDeadLetterQueueEnabled() throws Exception + { + try + { + ApplicationRegistry registry = new TestApplicationRegistry(new ServerConfiguration(_env)); + ApplicationRegistry.initialise(registry); + + // Check default value + QueueConfiguration qConf = new QueueConfiguration("test", _emptyConf); + assertFalse("Unexpected queue configuration for dead letter enabled attribute", qConf.isDeadLetterQueueEnabled()); + + // Check explicit value + VirtualHostConfiguration vhostConfig = overrideConfiguration("deadLetterQueues", true); + qConf = new QueueConfiguration("test", vhostConfig); + assertTrue("Unexpected queue configuration for dead letter enabled attribute", qConf.isDeadLetterQueueEnabled()); + + // Check inherited value + qConf = new QueueConfiguration("test", _fullHostConf); + assertTrue("Unexpected queue configuration for dead letter enabled attribute", qConf.isDeadLetterQueueEnabled()); + } + finally + { + ApplicationRegistry.remove(); + } + } + public void testGetMaximumMessageAge() throws ConfigurationException { // Check default value diff --git a/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java b/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java index 9ee2ed3812..7739f9976e 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java @@ -25,6 +25,7 @@ import static org.apache.qpid.transport.ConnectionSettings.WILDCARD_ADDRESS; import java.io.File; import java.io.FileWriter; import java.io.IOException; +import java.io.Writer; import java.util.Locale; import org.apache.commons.configuration.ConfigurationException; @@ -1459,4 +1460,117 @@ public class ServerConfigurationTest extends QpidTestCase ce.getMessage()); } } + + public void testMaxDeliveryCountDefault() throws Exception + { + final ServerConfiguration serverConfig = new ServerConfiguration(_config); + assertEquals(0, serverConfig.getMaxDeliveryCount()); + } + + public void testMaxDeliveryCount() throws Exception + { + _config.setProperty("maximumDeliveryCount", 5); + final ServerConfiguration serverConfig = new ServerConfiguration(_config); + assertEquals(5, serverConfig.getMaxDeliveryCount()); + } + + /** + * Test XML configuration file correctly enables dead letter queues + */ + public void testDeadLetterQueueConfigurationFile() throws Exception + { + // Write config + File xml = File.createTempFile(getClass().getName(), "xml"); + xml.deleteOnExit(); + FileWriter config = new FileWriter(xml); + config.write("<broker>\n"); + writeSecurity(config); + config.write("<deadLetterQueues>true</deadLetterQueues>\n"); + config.write("<virtualhosts>\n"); + config.write("<virtualhost>\n"); + config.write("<name>test</name>\n"); + config.write("<test>\n"); + config.write("<queues>\n"); + config.write("<deadLetterQueues>false</deadLetterQueues>\n"); + config.write("<queue>\n"); + config.write("<name>biggles</name>\n"); + config.write("<biggles>\n"); + config.write("<deadLetterQueues>true</deadLetterQueues>\n"); + config.write("</biggles>\n"); + config.write("</queue>\n"); + config.write("<queue>\n"); + config.write("<name>beetle</name>\n"); + config.write("<beetle />\n"); + config.write("</queue>\n"); + config.write("</queues>\n"); + config.write("</test>\n"); + config.write("</virtualhost>\n"); + config.write("<virtualhost>\n"); + config.write("<name>extra</name>\n"); + config.write("<extra>\n"); + config.write("<queues>\n"); + config.write("<queue>\n"); + config.write("<name>r2d2</name>\n"); + config.write("<r2d2>\n"); + config.write("<deadLetterQueues>false</deadLetterQueues>\n"); + config.write("</r2d2>\n"); + config.write("</queue>\n"); + config.write("<queue>\n"); + config.write("<name>c3p0</name>\n"); + config.write("<c3p0 />\n"); + config.write("</queue>\n"); + config.write("</queues>\n"); + config.write("</extra>\n"); + config.write("</virtualhost>\n"); + config.write("</virtualhosts>\n"); + config.write("</broker>\n"); + config.close(); + + // Load config + ApplicationRegistry.remove(); + ApplicationRegistry registry = new ConfigurationFileApplicationRegistry(xml); + ApplicationRegistry.initialise(registry); + ServerConfiguration serverConfiguration = ApplicationRegistry.getInstance().getConfiguration(); + + VirtualHostConfiguration test = serverConfiguration.getVirtualHostConfig("test"); + assertNotNull("Host 'test' is not found", test); + VirtualHostConfiguration extra = serverConfiguration.getVirtualHostConfig("extra"); + assertNotNull("Host 'extra' is not found", test); + + QueueConfiguration biggles = test.getQueueConfiguration("biggles"); + QueueConfiguration beetle = test.getQueueConfiguration("beetle"); + QueueConfiguration r2d2 = extra.getQueueConfiguration("r2d2"); + QueueConfiguration c3p0 = extra.getQueueConfiguration("c3p0"); + + // Validate config + assertTrue("Broker DLQ should be configured as enabled", serverConfiguration.isDeadLetterQueueEnabled()); + assertFalse("Test vhost DLQ should be configured as disabled", test.isDeadLetterQueueEnabled()); + assertTrue("Extra vhost DLQ should be enabled, using broker default", extra.isDeadLetterQueueEnabled()); + assertTrue("Biggles queue DLQ should be configured as enabled", biggles.isDeadLetterQueueEnabled()); + assertFalse("Beetle queue DLQ should be disabled, using test vhost default", beetle.isDeadLetterQueueEnabled()); + assertFalse("R2D2 queue DLQ should be configured as disabled", r2d2.isDeadLetterQueueEnabled()); + assertTrue("C3P0 queue DLQ should be enabled, using broker default", c3p0.isDeadLetterQueueEnabled()); + } + + /** + * Convenience method to output required security preamble for broker config + */ + private void writeSecurity(Writer out) throws Exception + { + out.write("\t<management><enabled>false</enabled></management>\n"); + out.write("\t<security>\n"); + out.write("\t\t<pd-auth-manager>\n"); + out.write("\t\t\t<principal-database>\n"); + out.write("\t\t\t\t<class>org.apache.qpid.server.security.auth.database.PlainPasswordFilePrincipalDatabase</class>\n"); + out.write("\t\t\t\t<attributes>\n"); + out.write("\t\t\t\t\t<attribute>\n"); + out.write("\t\t\t\t\t\t<name>passwordFile</name>\n"); + out.write("\t\t\t\t\t\t<value>/dev/null</value>\n"); + out.write("\t\t\t\t\t</attribute>\n"); + out.write("\t\t\t\t</attributes>\n"); + out.write("\t\t\t</principal-database>\n"); + out.write("\t\t\t<jmx-access>/dev/null</jmx-access>\n"); + out.write("\t\t</pd-auth-manager>\n"); + out.write("\t</security>\n"); + } } diff --git a/java/broker/src/test/java/org/apache/qpid/server/configuration/VirtualHostConfigurationTest.java b/java/broker/src/test/java/org/apache/qpid/server/configuration/VirtualHostConfigurationTest.java index b133d53ac5..f6cd397217 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/configuration/VirtualHostConfigurationTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/configuration/VirtualHostConfigurationTest.java @@ -21,7 +21,6 @@ package org.apache.qpid.server.configuration; import org.apache.commons.configuration.ConfigurationException; -import org.apache.commons.configuration.XMLConfiguration; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.server.queue.AMQPriorityQueue; import org.apache.qpid.server.queue.AMQQueue; @@ -34,19 +33,6 @@ public class VirtualHostConfigurationTest extends InternalBrokerBaseCase { @Override - public void setUp() throws Exception - { - super.setUp(); - // Set the default configuration items - getConfigXml().clear(); - getConfigXml().addProperty("virtualhosts.virtualhost(-1).name", "test"); - getConfigXml().addProperty("virtualhosts.virtualhost(-1).test.store.class", TestableMemoryMessageStore.class.getName()); - - getConfigXml().addProperty("virtualhosts.virtualhost.name", getName()); - getConfigXml().addProperty("virtualhosts.virtualhost."+getName()+".store.class", TestableMemoryMessageStore.class.getName()); - } - - @Override public void createBroker() { // Prevent auto broker startup @@ -134,6 +120,88 @@ public class VirtualHostConfigurationTest extends InternalBrokerBaseCase assertEquals(3, bTest.getMaximumMessageAge()); } + public void testMaxDeliveryCount() throws Exception + { + // Set up vhosts and queues + getConfigXml().addProperty("virtualhosts.virtualhost." + getName() + ".queues.maximumDeliveryCount", 5); + getConfigXml().addProperty("virtualhosts.virtualhost." + getName() + ".queues(-1).queue(-1).name", "biggles"); + getConfigXml().addProperty("virtualhosts.virtualhost." + getName() + ".queues.queue.biggles.maximumDeliveryCount", 4); + getConfigXml().addProperty("virtualhosts.virtualhost." + getName() + ".queues(-1).queue(-1).name", "beetle"); + + // Start the broker now. + super.createBroker(); + + // Get vhosts + VirtualHost test = ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost(getName()); + + // Enabled specifically + assertEquals("Test vhost MDC was configured as enabled", 5 ,test.getConfiguration().getMaxDeliveryCount()); + + // Enabled by test vhost default + assertEquals("beetle queue DLQ was configured as enabled", test.getConfiguration().getMaxDeliveryCount(), test.getConfiguration().getQueueConfiguration("beetle").getMaxDeliveryCount()); + + // Disabled specifically + assertEquals("Biggles queue DLQ was configured as disabled", 4, test.getConfiguration().getQueueConfiguration("biggles").getMaxDeliveryCount()); + } + + /** + * Tests the full set of configuration options for enabling DLQs in the broker configuration. + */ + public void testIsDeadLetterQueueEnabled() throws Exception + { + // Set up vhosts and queues + getConfigXml().addProperty("virtualhosts.virtualhost." + getName() + ".queues.deadLetterQueues", "true"); + getConfigXml().addProperty("virtualhosts.virtualhost." + getName() + ".queues(-1).queue(-1).name", "biggles"); + getConfigXml().addProperty("virtualhosts.virtualhost." + getName() + ".queues.queue.biggles.deadLetterQueues", "false"); + getConfigXml().addProperty("virtualhosts.virtualhost." + getName() + ".queues(-1).queue(-1).name", "beetle"); + + + getConfigXml().addProperty("virtualhosts.virtualhost.name", getName() + "Extra"); + getConfigXml().addProperty("virtualhosts.virtualhost." + getName() + "Extra.queues(-1).queue(-1).name", "r2d2"); + getConfigXml().addProperty("virtualhosts.virtualhost." + getName() + "Extra.queues.queue.r2d2.deadLetterQueues", "true"); + getConfigXml().addProperty("virtualhosts.virtualhost." + getName() + "Extra.queues(-1).queue(-1).name", "c3p0"); + getConfigXml().addProperty("virtualhosts.virtualhost." + getName() + "Extra.store.class", TestableMemoryMessageStore.class.getName()); + + // Start the broker now. + super.createBroker(); + + // Get vhosts + VirtualHost test = ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost(getName()); + VirtualHost extra = ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost(getName() + "Extra"); + + // Enabled specifically + assertTrue("Test vhost DLQ was configured as enabled", test.getConfiguration().isDeadLetterQueueEnabled()); + assertTrue("r2d2 queue DLQ was configured as enabled", extra.getConfiguration().getQueueConfiguration("r2d2").isDeadLetterQueueEnabled()); + + // Enabled by test vhost default + assertTrue("beetle queue DLQ was configured as enabled", test.getConfiguration().getQueueConfiguration("beetle").isDeadLetterQueueEnabled()); + + // Disabled specifically + assertFalse("Biggles queue DLQ was configured as disabled", test.getConfiguration().getQueueConfiguration("biggles").isDeadLetterQueueEnabled()); + + // Using broker default of disabled + assertFalse("Extra vhost DLQ disabled, using broker default", extra.getConfiguration().isDeadLetterQueueEnabled()); + assertFalse("c3p0 queue DLQ was configured as disabled", extra.getConfiguration().getQueueConfiguration("c3p0").isDeadLetterQueueEnabled()); + + // Get queues + AMQQueue biggles = test.getQueueRegistry().getQueue(new AMQShortString("biggles")); + AMQQueue beetle = test.getQueueRegistry().getQueue(new AMQShortString("beetle")); + AMQQueue r2d2 = extra.getQueueRegistry().getQueue(new AMQShortString("r2d2")); + AMQQueue c3p0 = extra.getQueueRegistry().getQueue(new AMQShortString("c3p0")); + + // Disabled specifically for this queue, overriding virtualhost setting + assertNull("Biggles queue should not have alt exchange as DLQ should be configured as disabled: " + biggles.getAlternateExchange(), biggles.getAlternateExchange()); + + // Enabled for all queues on the virtualhost + assertNotNull("Beetle queue should have an alt exchange as DLQ should be enabled, using test vhost default", beetle.getAlternateExchange()); + + // Enabled specifically for this queue, overriding the default broker setting of disabled + assertNotNull("R2D2 queue should have an alt exchange as DLQ should be configured as enabled", r2d2.getAlternateExchange()); + + // Disabled by the default broker setting + assertNull("C3PO queue should not have an alt exchange as DLQ should be disabled, using broker default", c3p0.getAlternateExchange()); + } + /** * Test that the house keeping pool sizes is correctly processed * @@ -173,7 +241,7 @@ public class VirtualHostConfigurationTest extends InternalBrokerBaseCase vhost.getHouseKeepingTaskCount()); // Currently the two are tasks: - // ExpiredMessageTask from VirtualHost + // ExpiredMessageTask from VirtualHost // UpdateTask from the QMF ManagementExchange } @@ -214,7 +282,7 @@ public class VirtualHostConfigurationTest extends InternalBrokerBaseCase { getConfigXml().addProperty("virtualhosts.virtualhost.testSecurityAuthenticationNameRejected.security.authentication.name", "testdb"); - + try { super.createBroker(); diff --git a/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java b/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java index 7b73987abf..ea2fe90da6 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java +++ b/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java @@ -490,6 +490,22 @@ public class AbstractHeadersExchangeTestBase extends InternalBrokerBaseCase { return null; } + + @Override + public int getDeliveryCount() + { + return 0; + } + + @Override + public void incrementDeliveryCount() + { + } + + @Override + public void decrementDeliveryCount() + { + } }; if(action != null) diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java index 27891289fb..2b7d1d7f26 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java @@ -20,39 +20,76 @@ */ package org.apache.qpid.server.queue; -import org.apache.qpid.server.registry.ApplicationRegistry; -import org.apache.qpid.server.util.InternalBrokerBaseCase; -import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.commons.configuration.XMLConfiguration; +import org.apache.qpid.AMQException; +import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.server.configuration.ServerConfiguration; +import org.apache.qpid.server.exchange.DefaultExchangeFactory; +import org.apache.qpid.server.exchange.Exchange; +import org.apache.qpid.server.exchange.ExchangeRegistry; +import org.apache.qpid.server.logging.SystemOutMessageLogger; +import org.apache.qpid.server.logging.actors.CurrentActor; +import org.apache.qpid.server.logging.actors.TestLogActor; +import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.server.store.TestableMemoryMessageStore; +import org.apache.qpid.server.util.TestApplicationRegistry; +import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.test.utils.QpidTestCase; -public class AMQQueueFactoryTest extends InternalBrokerBaseCase +public class AMQQueueFactoryTest extends QpidTestCase { QueueRegistry _queueRegistry; VirtualHost _virtualHost; - int _defaultQueueCount; @Override public void setUp() throws Exception { super.setUp(); - ApplicationRegistry registry = (ApplicationRegistry) ApplicationRegistry.getInstance(); - _virtualHost = registry.getVirtualHostRegistry().getVirtualHost("test"); + CurrentActor.set(new TestLogActor(new SystemOutMessageLogger())); + + XMLConfiguration configXml = new XMLConfiguration(); + configXml.addProperty("virtualhosts.virtualhost(-1).name", getName()); + configXml.addProperty("virtualhosts.virtualhost(-1)."+getName()+".store.class", TestableMemoryMessageStore.class.getName()); + + ServerConfiguration configuration = new ServerConfiguration(configXml); + + ApplicationRegistry registry = new TestApplicationRegistry(configuration); + ApplicationRegistry.initialise(registry); + registry.getVirtualHostRegistry().setDefaultVirtualHostName(getName()); + + _virtualHost = registry.getVirtualHostRegistry().getVirtualHost(getName()); _queueRegistry = _virtualHost.getQueueRegistry(); - _defaultQueueCount = _queueRegistry.getQueues().size(); } @Override public void tearDown() throws Exception { - assertEquals("Queue was not registered in virtualhost", _defaultQueueCount + 1, _queueRegistry.getQueues().size()); - super.tearDown(); + try + { + super.tearDown(); + } + finally + { + ApplicationRegistry.remove(); + } + } + + private void verifyRegisteredQueueCount(int count) + { + assertEquals("Queue was not registered in virtualhost", count, _queueRegistry.getQueues().size()); } + private void verifyQueueRegistered(String queueName) + { + assertNotNull("Queue " + queueName + " was not created", _queueRegistry.getQueue(queueName)); + } + public void testPriorityQueueRegistration() throws Exception { FieldTable fieldTable = new FieldTable(); @@ -63,13 +100,314 @@ public class AMQQueueFactoryTest extends InternalBrokerBaseCase false, _virtualHost, fieldTable); assertEquals("Queue not a priorty queue", AMQPriorityQueue.class, queue.getClass()); + verifyQueueRegistered("testPriorityQueue"); + verifyRegisteredQueueCount(1); } public void testSimpleQueueRegistration() throws Exception { - AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("testQueue"), false, new AMQShortString("owner"), false, + AMQShortString queueName = new AMQShortString("testSimpleQueueRegistration"); + AMQShortString dlQueueName = new AMQShortString(queueName + AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX); + + AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(queueName, false, new AMQShortString("owner"), false, false, _virtualHost, null); assertEquals("Queue not a simple queue", SimpleAMQQueue.class, queue.getClass()); + verifyQueueRegistered("testSimpleQueueRegistration"); + + //verify that no alternate exchange or DLQ were produced + QueueRegistry qReg = _virtualHost.getQueueRegistry(); + + assertNull("Queue should not have an alternate exchange as DLQ wasnt enabled", queue.getAlternateExchange()); + assertNull("The DLQ should not exist", qReg.getQueue(dlQueueName)); + + verifyRegisteredQueueCount(1); + } + + /** + * Tests that setting the {@link AMQQueueFactory#X_QPID_DLQ_ENABLED} argument true does + * cause the alternate exchange to be set and DLQ to be produced. + * @throws AMQException + */ + public void testDeadLetterQueueEnabled() throws AMQException + { + FieldTable fieldTable = new FieldTable(); + fieldTable.setBoolean(AMQQueueFactory.X_QPID_DLQ_ENABLED, true); + + AMQShortString queueName = new AMQShortString("testDeadLetterQueueEnabled"); + AMQShortString dlExchangeName = new AMQShortString(queueName + DefaultExchangeFactory.DEFAULT_DLE_NAME_SUFFIX); + AMQShortString dlQueueName = new AMQShortString(queueName + AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX); + + QueueRegistry qReg = _virtualHost.getQueueRegistry(); + ExchangeRegistry exReg = _virtualHost.getExchangeRegistry(); + + assertNull("The DLQ should not yet exist", qReg.getQueue(dlQueueName)); + assertNull("The alternate exchange should not yet exist", exReg.getExchange(dlExchangeName)); + + AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(queueName, false, new AMQShortString("owner"), false, false, + _virtualHost, fieldTable); + + Exchange altExchange = queue.getAlternateExchange(); + assertNotNull("Queue should have an alternate exchange as DLQ is enabled", altExchange); + assertEquals("Alternate exchange name was not as expected", dlExchangeName, altExchange.getName()); + assertEquals("Alternate exchange type was not as expected", ExchangeDefaults.FANOUT_EXCHANGE_CLASS, altExchange.getType().getName()); + + assertNotNull("The alternate exchange was not registered as expected", exReg.getExchange(dlExchangeName)); + assertEquals("The registered exchange was not the expected exchange instance", altExchange, exReg.getExchange(dlExchangeName)); + + AMQQueue dlQueue = qReg.getQueue(dlQueueName); + assertNotNull("The DLQ was not registered as expected", dlQueue); + assertTrue("DLQ should have been bound to the alternate exchange", altExchange.isBound(dlQueue)); + assertNull("DLQ should have no alternate exchange", dlQueue.getAlternateExchange()); + assertEquals("DLQ should have a zero maximum delivery count", 0, dlQueue.getMaximumDeliveryCount()); + + //2 queues should have been registered + verifyRegisteredQueueCount(2); + } + + /** + * Tests that the deadLetterQueues/maximumDeliveryCount settings from the configuration + * are not applied to the DLQ itself. + * @throws AMQException + */ + public void testDeadLetterQueueDoesNotInheritDLQorMDCSettings() throws AMQException + { + ApplicationRegistry.getInstance().getConfiguration().getConfig().addProperty("deadLetterQueues","true"); + ApplicationRegistry.getInstance().getConfiguration().getConfig().addProperty("maximumDeliveryCount","5"); + + AMQShortString queueName = new AMQShortString("testDeadLetterQueueEnabled"); + AMQShortString dlExchangeName = new AMQShortString(queueName + DefaultExchangeFactory.DEFAULT_DLE_NAME_SUFFIX); + AMQShortString dlQueueName = new AMQShortString(queueName + AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX); + + QueueRegistry qReg = _virtualHost.getQueueRegistry(); + ExchangeRegistry exReg = _virtualHost.getExchangeRegistry(); + + assertNull("The DLQ should not yet exist", qReg.getQueue(dlQueueName)); + assertNull("The alternate exchange should not yet exist", exReg.getExchange(dlExchangeName)); + + AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(queueName, false, new AMQShortString("owner"), false, false, + _virtualHost, null); + + assertEquals("Unexpected maximum delivery count", 5, queue.getMaximumDeliveryCount()); + Exchange altExchange = queue.getAlternateExchange(); + assertNotNull("Queue should have an alternate exchange as DLQ is enabled", altExchange); + assertEquals("Alternate exchange name was not as expected", dlExchangeName, altExchange.getName()); + assertEquals("Alternate exchange type was not as expected", ExchangeDefaults.FANOUT_EXCHANGE_CLASS, altExchange.getType().getName()); + + assertNotNull("The alternate exchange was not registered as expected", exReg.getExchange(dlExchangeName)); + assertEquals("The registered exchange was not the expected exchange instance", altExchange, exReg.getExchange(dlExchangeName)); + + AMQQueue dlQueue = qReg.getQueue(dlQueueName); + assertNotNull("The DLQ was not registered as expected", dlQueue); + assertTrue("DLQ should have been bound to the alternate exchange", altExchange.isBound(dlQueue)); + assertNull("DLQ should have no alternate exchange", dlQueue.getAlternateExchange()); + assertEquals("DLQ should have a zero maximum delivery count", 0, dlQueue.getMaximumDeliveryCount()); + + //2 queues should have been registered + verifyRegisteredQueueCount(2); + } + + /** + * Tests that setting the {@link AMQQueueFactory#X_QPID_DLQ_ENABLED} argument false does not + * result in the alternate exchange being set and DLQ being created. + * @throws AMQException + */ + public void testDeadLetterQueueDisabled() throws AMQException + { + FieldTable fieldTable = new FieldTable(); + fieldTable.setBoolean(AMQQueueFactory.X_QPID_DLQ_ENABLED, false); + + AMQShortString queueName = new AMQShortString("testDeadLetterQueueDisabled"); + AMQShortString dlExchangeName = new AMQShortString(queueName + DefaultExchangeFactory.DEFAULT_DLE_NAME_SUFFIX); + AMQShortString dlQueueName = new AMQShortString(queueName + AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX); + + QueueRegistry qReg = _virtualHost.getQueueRegistry(); + ExchangeRegistry exReg = _virtualHost.getExchangeRegistry(); + + assertNull("The DLQ should not yet exist", qReg.getQueue(dlQueueName)); + assertNull("The alternate exchange should not exist", exReg.getExchange(dlExchangeName)); + + AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(queueName, false, new AMQShortString("owner"), false, false, + _virtualHost, fieldTable); + + assertNull("Queue should not have an alternate exchange as DLQ is disabled", queue.getAlternateExchange()); + assertNull("The alternate exchange should still not exist", exReg.getExchange(dlExchangeName)); + + assertNull("The DLQ should still not exist", qReg.getQueue(dlQueueName)); + + //only 1 queue should have been registered + verifyRegisteredQueueCount(1); + } + + /** + * Tests that setting the {@link AMQQueueFactory#X_QPID_DLQ_ENABLED} argument true but + * creating an auto-delete queue, does not result in the alternate exchange + * being set and DLQ being created. + * @throws AMQException + */ + public void testDeadLetterQueueNotCreatedForAutodeleteQueues() throws AMQException + { + FieldTable fieldTable = new FieldTable(); + fieldTable.setBoolean(AMQQueueFactory.X_QPID_DLQ_ENABLED, true); + + AMQShortString queueName = new AMQShortString("testDeadLetterQueueNotCreatedForAutodeleteQueues"); + AMQShortString dlExchangeName = new AMQShortString(queueName + DefaultExchangeFactory.DEFAULT_DLE_NAME_SUFFIX); + AMQShortString dlQueueName = new AMQShortString(queueName + AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX); + + QueueRegistry qReg = _virtualHost.getQueueRegistry(); + ExchangeRegistry exReg = _virtualHost.getExchangeRegistry(); + + assertNull("The DLQ should not yet exist", qReg.getQueue(dlQueueName)); + assertNull("The alternate exchange should not exist", exReg.getExchange(dlExchangeName)); + + //create an autodelete queue + AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(queueName, false, new AMQShortString("owner"), true, false, + _virtualHost, fieldTable); + assertTrue("Queue should be autodelete", queue.isAutoDelete()); + + //ensure that the autodelete property overrides the request to enable DLQ + assertNull("Queue should not have an alternate exchange as queue is autodelete", queue.getAlternateExchange()); + assertNull("The alternate exchange should not exist as queue is autodelete", exReg.getExchange(dlExchangeName)); + assertNull("The DLQ should not exist as queue is autodelete", qReg.getQueue(dlQueueName)); + + //only 1 queue should have been registered + verifyRegisteredQueueCount(1); + } + + /** + * Tests that setting the {@link AMQQueueFactory#X_QPID_MAXIMUM_DELIVERY_COUNT} argument has + * the desired effect. + */ + public void testMaximumDeliveryCount() throws Exception + { + final FieldTable fieldTable = new FieldTable(); + fieldTable.setInteger(AMQQueueFactory.X_QPID_MAXIMUM_DELIVERY_COUNT, 5); + + final AMQShortString queueName = new AMQShortString("testMaximumDeliveryCount"); + + final AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(queueName, false, new AMQShortString("owner"), false, false, + _virtualHost, fieldTable); + + assertNotNull("The queue was not registered as expected ", queue); + assertEquals("Maximum delivery count not as expected", 5, queue.getMaximumDeliveryCount()); + + verifyRegisteredQueueCount(1); + } + + /** + * Tests that omitting the {@link AMQQueueFactory#X_QPID_MAXIMUM_DELIVERY_COUNT} argument means + * that queue is created with a default maximumDeliveryCount of zero (unless set in config). + */ + public void testMaximumDeliveryCountDefault() throws Exception + { + + final AMQShortString queueName = new AMQShortString("testMaximumDeliveryCount"); + + final AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(queueName, false, new AMQShortString("owner"), false, false, + _virtualHost, null); + + assertNotNull("The queue was not registered as expected ", queue); + assertEquals("Maximum delivery count not as expected", 0, queue.getMaximumDeliveryCount()); + + verifyRegisteredQueueCount(1); + } + + /** + * Tests queue creation with queue name set to null + */ + public void testQueueNameNullValidation() + { + try + { + AMQQueueFactory.createAMQQueueImpl(null, false, new AMQShortString("owner"), true, false, _virtualHost, null); + fail("queue with null name can not be created!"); + } + catch (Exception e) + { + assertTrue(e instanceof IllegalArgumentException); + assertEquals("Queue name must not be null", e.getMessage()); + } + } + + /** + * Tests queue creation with queue name length less 255 characters but + * corresponding DLQ name length greater than 255. + */ + public void testQueueNameWithLengthLessThan255ButDLQNameWithLengthGreaterThan255() + { + String queueName = "test-" + generateStringWithLength('a', 245); + try + { + // change DLQ name to make its length bigger than exchange name + ApplicationRegistry.getInstance().getConfiguration().getConfig() + .addProperty("deadLetterExchangeSuffix", "_DLE"); + ApplicationRegistry.getInstance().getConfiguration().getConfig() + .addProperty("deadLetterQueueSuffix", "_DLQUEUE"); + + FieldTable fieldTable = new FieldTable(); + fieldTable.setBoolean(AMQQueueFactory.X_QPID_DLQ_ENABLED, true); + AMQQueueFactory.createAMQQueueImpl(new AMQShortString(queueName), false, new AMQShortString("owner"), + false, false, _virtualHost, fieldTable); + fail("queue with DLQ name having more than 255 characters can not be created!"); + } + catch (Exception e) + { + assertTrue("Unexpected exception is thrown!", e instanceof IllegalArgumentException); + assertTrue("Unexpected exception message!", e.getMessage().contains("DLQ queue name") + && e.getMessage().contains("length exceeds limit of 255")); + } + finally + { + ApplicationRegistry.getInstance().getConfiguration().getConfig() + .addProperty("deadLetterExchangeSuffix", DefaultExchangeFactory.DEFAULT_DLE_NAME_SUFFIX); + ApplicationRegistry.getInstance().getConfiguration().getConfig() + .addProperty("deadLetterQueueSuffix", AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX); + } + } + + /** + * Tests queue creation with queue name length less 255 characters but + * corresponding DL exchange name length greater than 255. + */ + public void testQueueNameWithLengthLessThan255ButDLExchangeNameWithLengthGreaterThan255() + { + String queueName = "test-" + generateStringWithLength('a', 245); + try + { + // change DLQ name to make its length bigger than exchange name + ApplicationRegistry.getInstance().getConfiguration().getConfig() + .addProperty("deadLetterExchangeSuffix", "_DLEXCHANGE"); + ApplicationRegistry.getInstance().getConfiguration().getConfig() + .addProperty("deadLetterQueueSuffix", "_DLQ"); + + FieldTable fieldTable = new FieldTable(); + fieldTable.setBoolean(AMQQueueFactory.X_QPID_DLQ_ENABLED, true); + AMQQueueFactory.createAMQQueueImpl(new AMQShortString(queueName), false, new AMQShortString("owner"), + false, false, _virtualHost, fieldTable); + fail("queue with DLE name having more than 255 characters can not be created!"); + } + catch (Exception e) + { + assertTrue("Unexpected exception is thrown!", e instanceof IllegalArgumentException); + assertTrue("Unexpected exception message!", e.getMessage().contains("DL exchange name") + && e.getMessage().contains("length exceeds limit of 255")); + } + finally + { + ApplicationRegistry.getInstance().getConfiguration().getConfig() + .addProperty("deadLetterExchangeSuffix", DefaultExchangeFactory.DEFAULT_DLE_NAME_SUFFIX); + ApplicationRegistry.getInstance().getConfiguration().getConfig() + .addProperty("deadLetterQueueSuffix", AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX); + } + } + + private String generateStringWithLength(char ch, int length) + { + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < length; i++) + { + sb.append(ch); + } + return sb.toString(); } } diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java index 8f3023f269..f70250132a 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java @@ -44,6 +44,7 @@ import javax.management.openmbean.CompositeData; import javax.management.openmbean.CompositeDataSupport; import javax.management.openmbean.TabularData; +import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Date; @@ -390,6 +391,34 @@ public class AMQQueueMBeanTest extends InternalBrokerBaseCase assertFalse(channel.getBlocking()); } + public void testMaximumDeliveryCount() throws IOException + { + assertEquals("Unexpected default maximum delivery count", Integer.valueOf(0), _queueMBean.getMaximumDeliveryCount()); + } + + public void testViewAllMessages() throws Exception + { + final int messageCount = 5; + sendPersistentMessages(messageCount); + + + final TabularData messageTable = _queueMBean.viewMessages(1L, 5L); + assertNotNull("Message table should not be null", messageTable); + assertEquals("Unexpected number of rows", messageCount, messageTable.size()); + + + final Iterator rowIterator = messageTable.values().iterator(); + // Get its message ID + final CompositeDataSupport row1 = (CompositeDataSupport) rowIterator.next(); + final Long msgId = (Long) row1.get("AMQ MessageId"); + final Long queuePosition = (Long) row1.get("Queue Position"); + final Integer deliveryCount = (Integer) row1.get("Delivery Count"); + + assertNotNull("Row should have value for queue position", queuePosition); + assertNotNull("Row should have value for msgid", msgId); + assertNotNull("Row should have value for deliveryCount", deliveryCount); + } + @Override public void setUp() throws Exception @@ -404,6 +433,13 @@ public class AMQQueueMBeanTest extends InternalBrokerBaseCase ApplicationRegistry.remove(); } + private void sendPersistentMessages(int messageCount) throws AMQException + { + sendMessages(messageCount, true); + assertEquals("Expected " + messageCount + " messages in the queue", messageCount, _queueMBean + .getMessageCount().intValue()); + } + private List<AMQMessage> sendMessages(int messageCount, boolean persistent) throws AMQException { return sendMessages(messageCount, persistent, 0l, 0l); diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java b/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java index 4c31092983..0daf79122c 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java @@ -611,4 +611,21 @@ public class MockAMQQueue implements AMQQueue { } + + @Override + public int getMaximumDeliveryCount() + { + return 0; + } + + @Override + public void setMaximumDeliveryCount(int maximumDeliveryCount) + { + } + + @Override + public void setAlternateExchange(String exchangeName) + { + } + } diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java b/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java index 864b9ad368..7ad002c248 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java @@ -244,4 +244,21 @@ public class MockQueueEntry implements QueueEntry return null; } + @Override + public int getDeliveryCount() + { + return 0; + } + + @Override + public void incrementDeliveryCount() + { + } + + @Override + public void decrementDeliveryCount() + { + } + + } |
