summaryrefslogtreecommitdiff
path: root/qpid/java
diff options
context:
space:
mode:
authorKeith Wall <kwall@apache.org>2012-11-25 22:38:22 +0000
committerKeith Wall <kwall@apache.org>2012-11-25 22:38:22 +0000
commit553710214d0cabf62129d415b28683e7713c6bab (patch)
treef27e4b359d8f7b041aec55b7e5fe525a25111af9 /qpid/java
parent9e541180a7ecb8f73cce17c6545c6edba0ea7ed4 (diff)
downloadqpid-python-553710214d0cabf62129d415b28683e7713c6bab.tar.gz
QPID-4470: Allow 'maximum queue depth' queue alerting threshold to be set from JMX/AMQP
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1413433 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java15
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java27
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/systest/management/jmx/QueueManagementTest.java30
3 files changed, 58 insertions, 14 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java
index 78f6d38d93..b907d4177a 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java
@@ -50,15 +50,16 @@ final class QueueAdapter extends AbstractAdapter implements Queue, AMQQueue.Subs
static final Map<String, String> ATTRIBUTE_MAPPINGS = new HashMap<String, String>();
static
{
- QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.ALERT_REPEAT_GAP, "x-qpid-minimum-alert-repeat-gap");
- QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.ALERT_THRESHOLD_MESSAGE_AGE, "x-qpid-maximum-message-age");
- QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.ALERT_THRESHOLD_MESSAGE_SIZE, "x-qpid-maximum-message-size");
- QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES, "x-qpid-maximum-message-count");
+ QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.ALERT_REPEAT_GAP, AMQQueueFactory.X_QPID_MINIMUM_ALERT_REPEAT_GAP);
+ QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.ALERT_THRESHOLD_MESSAGE_AGE, AMQQueueFactory.X_QPID_MAXIMUM_MESSAGE_AGE);
+ QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.ALERT_THRESHOLD_MESSAGE_SIZE, AMQQueueFactory.X_QPID_MAXIMUM_MESSAGE_SIZE);
+ QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES, AMQQueueFactory.X_QPID_MAXIMUM_MESSAGE_COUNT);
+ QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.ALERT_THRESHOLD_QUEUE_DEPTH_BYTES, AMQQueueFactory.X_QPID_MAXIMUM_QUEUE_DEPTH);
- QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.MAXIMUM_DELIVERY_ATTEMPTS, "x-qpid-maximum-delivery-count");
+ QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.MAXIMUM_DELIVERY_ATTEMPTS, AMQQueueFactory.X_QPID_MAXIMUM_DELIVERY_COUNT);
- QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.QUEUE_FLOW_CONTROL_SIZE_BYTES, "x-qpid-capacity");
- QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.QUEUE_FLOW_RESUME_SIZE_BYTES, "x-qpid-flow-resume-capacity");
+ QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.QUEUE_FLOW_CONTROL_SIZE_BYTES, AMQQueueFactory.X_QPID_CAPACITY);
+ QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.QUEUE_FLOW_RESUME_SIZE_BYTES, AMQQueueFactory.X_QPID_FLOW_RESUME_CAPACITY);
QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.SORT_KEY, AMQQueueFactory.QPID_QUEUE_SORT_KEY);
QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.LVQ_KEY, AMQQueueFactory.QPID_LAST_VALUE_QUEUE_KEY);
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
index 3a18fae2ec..6a1c3bd893 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
@@ -41,6 +41,14 @@ import org.apache.qpid.server.virtualhost.VirtualHost;
public class AMQQueueFactory
{
+ public static final String X_QPID_FLOW_RESUME_CAPACITY = "x-qpid-flow-resume-capacity";
+ public static final String X_QPID_CAPACITY = "x-qpid-capacity";
+ public static final String X_QPID_MINIMUM_ALERT_REPEAT_GAP = "x-qpid-minimum-alert-repeat-gap";
+ public static final String X_QPID_MAXIMUM_MESSAGE_COUNT = "x-qpid-maximum-message-count";
+ public static final String X_QPID_MAXIMUM_MESSAGE_SIZE = "x-qpid-maximum-message-size";
+ public static final String X_QPID_MAXIMUM_MESSAGE_AGE = "x-qpid-maximum-message-age";
+ public static final String X_QPID_MAXIMUM_QUEUE_DEPTH = "x-qpid-maximum-queue-depth";
+
public static final String X_QPID_PRIORITIES = "x-qpid-priorities";
public static final String X_QPID_DESCRIPTION = "x-qpid-description";
public static final String QPID_LVQ_KEY = "qpid.LVQ_key";
@@ -119,42 +127,49 @@ public class AMQQueueFactory
}
private static final QueueProperty[] DECLAREABLE_PROPERTIES = {
- new QueueLongProperty("x-qpid-maximum-message-age")
+ new QueueLongProperty(X_QPID_MAXIMUM_MESSAGE_AGE)
{
public void setPropertyValue(AMQQueue queue, long value)
{
queue.setMaximumMessageAge(value);
}
},
- new QueueLongProperty("x-qpid-maximum-message-size")
+ new QueueLongProperty(X_QPID_MAXIMUM_MESSAGE_SIZE)
{
public void setPropertyValue(AMQQueue queue, long value)
{
queue.setMaximumMessageSize(value);
}
},
- new QueueLongProperty("x-qpid-maximum-message-count")
+ new QueueLongProperty(X_QPID_MAXIMUM_MESSAGE_COUNT)
{
public void setPropertyValue(AMQQueue queue, long value)
{
queue.setMaximumMessageCount(value);
}
},
- new QueueLongProperty("x-qpid-minimum-alert-repeat-gap")
+ new QueueLongProperty(X_QPID_MAXIMUM_QUEUE_DEPTH)
+ {
+ public void setPropertyValue(AMQQueue queue, long value)
+ {
+ queue.setMaximumQueueDepth(value);
+ }
+ },
+ new QueueLongProperty(X_QPID_MINIMUM_ALERT_REPEAT_GAP)
{
public void setPropertyValue(AMQQueue queue, long value)
{
queue.setMinimumAlertRepeatGap(value);
}
},
- new QueueLongProperty("x-qpid-capacity")
+ new QueueLongProperty(X_QPID_CAPACITY)
{
public void setPropertyValue(AMQQueue queue, long value)
{
queue.setCapacity(value);
}
},
- new QueueLongProperty("x-qpid-flow-resume-capacity")
+ new QueueLongProperty(X_QPID_FLOW_RESUME_CAPACITY)
{
public void setPropertyValue(AMQQueue queue, long value)
{
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/systest/management/jmx/QueueManagementTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/systest/management/jmx/QueueManagementTest.java
index 79d04b239e..10a2f954d0 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/systest/management/jmx/QueueManagementTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/systest/management/jmx/QueueManagementTest.java
@@ -206,13 +206,41 @@ public class QueueManagementTest extends QpidBrokerTestCase
managedBroker.createNewQueue(queueName, null, true, arguments);
// Ensure the queue exists
- assertNotNull("Queue object name expected to exist", _jmxUtils.getQueueObjectName("test", queueName));
+ assertNotNull("Queue object name expected to exist", _jmxUtils.getQueueObjectName(VIRTUAL_HOST, queueName));
assertNotNull("Manager queue expected to be available", _jmxUtils.getManagedQueue(queueName));
final ManagedQueue managedQueue = _jmxUtils.getManagedQueue(queueName);
assertEquals("Unexpected maximum delivery count", deliveryCount, managedQueue.getMaximumDeliveryCount());
}
+ public void testCreateQueueWithAlertingThresholdsSet() throws Exception
+ {
+ final String queueName = getName();
+ final ManagedBroker managedBroker = _jmxUtils.getManagedBroker(VIRTUAL_HOST);
+
+ final Long maximumMessageCount = 100l;
+ final Long maximumMessageSize = 200l;
+ final Long maximumQueueDepth = 300l;
+ final Long maximumMessageAge = 400l;
+ final Map<String, Object> arguments = new HashMap<String, Object>();
+ arguments.put(AMQQueueFactory.X_QPID_MAXIMUM_MESSAGE_COUNT, maximumMessageCount);
+ arguments.put(AMQQueueFactory.X_QPID_MAXIMUM_MESSAGE_SIZE, maximumMessageSize);
+ arguments.put(AMQQueueFactory.X_QPID_MAXIMUM_QUEUE_DEPTH, maximumQueueDepth);
+ arguments.put(AMQQueueFactory.X_QPID_MAXIMUM_MESSAGE_AGE, maximumMessageAge);
+
+ managedBroker.createNewQueue(queueName, null, true, arguments);
+
+ // Ensure the queue exists
+ assertNotNull("Queue object name expected to exist", _jmxUtils.getQueueObjectName(VIRTUAL_HOST, queueName));
+ assertNotNull("Manager queue expected to be available", _jmxUtils.getManagedQueue(queueName));
+
+ ManagedQueue managedQueue = _jmxUtils.getManagedQueue(queueName);
+ assertEquals("Unexpected maximum message count", maximumMessageCount, managedQueue.getMaximumMessageCount());
+ assertEquals("Unexpected maximum message size", maximumMessageSize, managedQueue.getMaximumMessageSize());
+ assertEquals("Unexpected maximum queue depth", maximumQueueDepth, managedQueue.getMaximumQueueDepth());
+ assertEquals("Unexpected maximum message age", maximumMessageAge, managedQueue.getMaximumMessageAge());
+ }
+
/**
* Requires 0-10 as relies on ADDR addresses.
* @see AddressBasedDestinationTest for the testing of message routing to the alternate exchange