diff options
| author | Keith Wall <kwall@apache.org> | 2012-11-25 22:38:22 +0000 |
|---|---|---|
| committer | Keith Wall <kwall@apache.org> | 2012-11-25 22:38:22 +0000 |
| commit | 553710214d0cabf62129d415b28683e7713c6bab (patch) | |
| tree | f27e4b359d8f7b041aec55b7e5fe525a25111af9 /qpid/java | |
| parent | 9e541180a7ecb8f73cce17c6545c6edba0ea7ed4 (diff) | |
| download | qpid-python-553710214d0cabf62129d415b28683e7713c6bab.tar.gz | |
QPID-4470: Allow 'maximum queue depth' queue alerting threshold to be set from JMX/AMQP
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1413433 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
3 files changed, 58 insertions, 14 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java index 78f6d38d93..b907d4177a 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java @@ -50,15 +50,16 @@ final class QueueAdapter extends AbstractAdapter implements Queue, AMQQueue.Subs static final Map<String, String> ATTRIBUTE_MAPPINGS = new HashMap<String, String>(); static { - QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.ALERT_REPEAT_GAP, "x-qpid-minimum-alert-repeat-gap"); - QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.ALERT_THRESHOLD_MESSAGE_AGE, "x-qpid-maximum-message-age"); - QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.ALERT_THRESHOLD_MESSAGE_SIZE, "x-qpid-maximum-message-size"); - QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES, "x-qpid-maximum-message-count"); + QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.ALERT_REPEAT_GAP, AMQQueueFactory.X_QPID_MINIMUM_ALERT_REPEAT_GAP); + QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.ALERT_THRESHOLD_MESSAGE_AGE, AMQQueueFactory.X_QPID_MAXIMUM_MESSAGE_AGE); + QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.ALERT_THRESHOLD_MESSAGE_SIZE, AMQQueueFactory.X_QPID_MAXIMUM_MESSAGE_SIZE); + QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES, AMQQueueFactory.X_QPID_MAXIMUM_MESSAGE_COUNT); + QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.ALERT_THRESHOLD_QUEUE_DEPTH_BYTES, AMQQueueFactory.X_QPID_MAXIMUM_QUEUE_DEPTH); - QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.MAXIMUM_DELIVERY_ATTEMPTS, "x-qpid-maximum-delivery-count"); + QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.MAXIMUM_DELIVERY_ATTEMPTS, AMQQueueFactory.X_QPID_MAXIMUM_DELIVERY_COUNT); - QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.QUEUE_FLOW_CONTROL_SIZE_BYTES, "x-qpid-capacity"); - QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.QUEUE_FLOW_RESUME_SIZE_BYTES, "x-qpid-flow-resume-capacity"); + QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.QUEUE_FLOW_CONTROL_SIZE_BYTES, AMQQueueFactory.X_QPID_CAPACITY); + QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.QUEUE_FLOW_RESUME_SIZE_BYTES, AMQQueueFactory.X_QPID_FLOW_RESUME_CAPACITY); QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.SORT_KEY, AMQQueueFactory.QPID_QUEUE_SORT_KEY); QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.LVQ_KEY, AMQQueueFactory.QPID_LAST_VALUE_QUEUE_KEY); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java index 3a18fae2ec..6a1c3bd893 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java @@ -41,6 +41,14 @@ import org.apache.qpid.server.virtualhost.VirtualHost; public class AMQQueueFactory { + public static final String X_QPID_FLOW_RESUME_CAPACITY = "x-qpid-flow-resume-capacity"; + public static final String X_QPID_CAPACITY = "x-qpid-capacity"; + public static final String X_QPID_MINIMUM_ALERT_REPEAT_GAP = "x-qpid-minimum-alert-repeat-gap"; + public static final String X_QPID_MAXIMUM_MESSAGE_COUNT = "x-qpid-maximum-message-count"; + public static final String X_QPID_MAXIMUM_MESSAGE_SIZE = "x-qpid-maximum-message-size"; + public static final String X_QPID_MAXIMUM_MESSAGE_AGE = "x-qpid-maximum-message-age"; + public static final String X_QPID_MAXIMUM_QUEUE_DEPTH = "x-qpid-maximum-queue-depth"; + public static final String X_QPID_PRIORITIES = "x-qpid-priorities"; public static final String X_QPID_DESCRIPTION = "x-qpid-description"; public static final String QPID_LVQ_KEY = "qpid.LVQ_key"; @@ -119,42 +127,49 @@ public class AMQQueueFactory } private static final QueueProperty[] DECLAREABLE_PROPERTIES = { - new QueueLongProperty("x-qpid-maximum-message-age") + new QueueLongProperty(X_QPID_MAXIMUM_MESSAGE_AGE) { public void setPropertyValue(AMQQueue queue, long value) { queue.setMaximumMessageAge(value); } }, - new QueueLongProperty("x-qpid-maximum-message-size") + new QueueLongProperty(X_QPID_MAXIMUM_MESSAGE_SIZE) { public void setPropertyValue(AMQQueue queue, long value) { queue.setMaximumMessageSize(value); } }, - new QueueLongProperty("x-qpid-maximum-message-count") + new QueueLongProperty(X_QPID_MAXIMUM_MESSAGE_COUNT) { public void setPropertyValue(AMQQueue queue, long value) { queue.setMaximumMessageCount(value); } }, - new QueueLongProperty("x-qpid-minimum-alert-repeat-gap") + new QueueLongProperty(X_QPID_MAXIMUM_QUEUE_DEPTH) + { + public void setPropertyValue(AMQQueue queue, long value) + { + queue.setMaximumQueueDepth(value); + } + }, + new QueueLongProperty(X_QPID_MINIMUM_ALERT_REPEAT_GAP) { public void setPropertyValue(AMQQueue queue, long value) { queue.setMinimumAlertRepeatGap(value); } }, - new QueueLongProperty("x-qpid-capacity") + new QueueLongProperty(X_QPID_CAPACITY) { public void setPropertyValue(AMQQueue queue, long value) { queue.setCapacity(value); } }, - new QueueLongProperty("x-qpid-flow-resume-capacity") + new QueueLongProperty(X_QPID_FLOW_RESUME_CAPACITY) { public void setPropertyValue(AMQQueue queue, long value) { diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/systest/management/jmx/QueueManagementTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/systest/management/jmx/QueueManagementTest.java index 79d04b239e..10a2f954d0 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/systest/management/jmx/QueueManagementTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/systest/management/jmx/QueueManagementTest.java @@ -206,13 +206,41 @@ public class QueueManagementTest extends QpidBrokerTestCase managedBroker.createNewQueue(queueName, null, true, arguments); // Ensure the queue exists - assertNotNull("Queue object name expected to exist", _jmxUtils.getQueueObjectName("test", queueName)); + assertNotNull("Queue object name expected to exist", _jmxUtils.getQueueObjectName(VIRTUAL_HOST, queueName)); assertNotNull("Manager queue expected to be available", _jmxUtils.getManagedQueue(queueName)); final ManagedQueue managedQueue = _jmxUtils.getManagedQueue(queueName); assertEquals("Unexpected maximum delivery count", deliveryCount, managedQueue.getMaximumDeliveryCount()); } + public void testCreateQueueWithAlertingThresholdsSet() throws Exception + { + final String queueName = getName(); + final ManagedBroker managedBroker = _jmxUtils.getManagedBroker(VIRTUAL_HOST); + + final Long maximumMessageCount = 100l; + final Long maximumMessageSize = 200l; + final Long maximumQueueDepth = 300l; + final Long maximumMessageAge = 400l; + final Map<String, Object> arguments = new HashMap<String, Object>(); + arguments.put(AMQQueueFactory.X_QPID_MAXIMUM_MESSAGE_COUNT, maximumMessageCount); + arguments.put(AMQQueueFactory.X_QPID_MAXIMUM_MESSAGE_SIZE, maximumMessageSize); + arguments.put(AMQQueueFactory.X_QPID_MAXIMUM_QUEUE_DEPTH, maximumQueueDepth); + arguments.put(AMQQueueFactory.X_QPID_MAXIMUM_MESSAGE_AGE, maximumMessageAge); + + managedBroker.createNewQueue(queueName, null, true, arguments); + + // Ensure the queue exists + assertNotNull("Queue object name expected to exist", _jmxUtils.getQueueObjectName(VIRTUAL_HOST, queueName)); + assertNotNull("Manager queue expected to be available", _jmxUtils.getManagedQueue(queueName)); + + ManagedQueue managedQueue = _jmxUtils.getManagedQueue(queueName); + assertEquals("Unexpected maximum message count", maximumMessageCount, managedQueue.getMaximumMessageCount()); + assertEquals("Unexpected maximum message size", maximumMessageSize, managedQueue.getMaximumMessageSize()); + assertEquals("Unexpected maximum queue depth", maximumQueueDepth, managedQueue.getMaximumQueueDepth()); + assertEquals("Unexpected maximum message age", maximumMessageAge, managedQueue.getMaximumMessageAge()); + } + /** * Requires 0-10 as relies on ADDR addresses. * @see AddressBasedDestinationTest for the testing of message routing to the alternate exchange |
