diff options
| author | Bhupendra Bhusman Bhardwaj <bhupendrab@apache.org> | 2007-03-27 09:07:30 +0000 |
|---|---|---|
| committer | Bhupendra Bhusman Bhardwaj <bhupendrab@apache.org> | 2007-03-27 09:07:30 +0000 |
| commit | fb14a2042dd5bdae5a5c79b8cd4f1ad87e59bee1 (patch) | |
| tree | 0a59a872ff849d3056c803836aed199f8a49c1e2 /java/broker | |
| parent | 4b5818ed830c97978771ffe7be1ff2ac587bd989 (diff) | |
| download | qpid-python-fb14a2042dd5bdae5a5c79b8cd4f1ad87e59bee1.tar.gz | |
merged from M2 (r521792:522567) QPID-408 QPID-421 QPID-428
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@522821 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker')
7 files changed, 56 insertions, 19 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java index f17a6fb60a..a418bb8f8a 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java @@ -118,7 +118,7 @@ public class AMQQueue implements Managable, Comparable /** max allowed number of messages on a queue. */ @Configured(path = "maximumMessageCount", defaultValue = "0") - public int _maximumMessageCount; + public long _maximumMessageCount; /** max queue depth for the queue */ @Configured(path = "maximumQueueDepth", defaultValue = "0") @@ -350,12 +350,12 @@ public class AMQQueue implements Managable, Comparable return _totalMessagesReceived.get(); } - public int getMaximumMessageCount() + public long getMaximumMessageCount() { return _maximumMessageCount; } - public void setMaximumMessageCount(int value) + public void setMaximumMessageCount(long value) { _maximumMessageCount = value; } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java index 056fb5fc01..7a32848c44 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java @@ -20,6 +20,8 @@ package org.apache.qpid.server.queue; import java.util.ArrayList; import java.util.Iterator; import java.util.List; +import java.util.Date; +import java.text.SimpleDateFormat; import javax.management.JMException; import javax.management.MBeanException; @@ -44,6 +46,7 @@ import org.apache.qpid.AMQException; import org.apache.qpid.framing.CommonContentHeaderProperties; import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.framing.abstraction.ContentChunk; import org.apache.qpid.server.management.AMQManagedObject; import org.apache.qpid.server.management.MBeanConstructor; @@ -58,8 +61,8 @@ import org.apache.qpid.server.store.StoreContext; @MBeanDescription("Management Interface for AMQQueue") public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, QueueNotificationListener { - private static final Logger _logger = Logger.getLogger(AMQQueueMBean.class); + private static final SimpleDateFormat _dateFormat = new SimpleDateFormat("MM-dd-yy HH:mm:ss.SSS z"); /** * Since the MBean is not associated with a real channel we can safely create our own store context @@ -197,12 +200,12 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que return _queue.getReceivedMessageCount(); } - public Integer getMaximumMessageCount() + public Long getMaximumMessageCount() { return _queue.getMaximumMessageCount(); } - public void setMaximumMessageCount(Integer value) + public void setMaximumMessageCount(Long value) { _queue.setMaximumMessageCount(value); } @@ -370,8 +373,7 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que AMQMessage msg = list.get(i - 1); ContentHeaderBody headerBody = msg.getContentHeaderBody(); // Create header attributes list - CommonContentHeaderProperties headerProperties = (CommonContentHeaderProperties) headerBody.properties; - String[] headerAttributes = headerProperties.toString().split(","); + String[] headerAttributes = getMessageHeaderProperties(headerBody); Object[] itemValues = {msg.getMessageId(), headerAttributes, headerBody.bodySize, msg.isRedelivered()}; CompositeData messageData = new CompositeDataSupport(_messageDataType, _msgAttributeNames, itemValues); _messageList.put(messageData); @@ -385,6 +387,35 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que return _messageList; } + private String[] getMessageHeaderProperties(ContentHeaderBody headerBody) + { + List<String> list = new ArrayList<String>(); + BasicContentHeaderProperties headerProperties = (BasicContentHeaderProperties) headerBody.properties; + list.add("reply-to = " + headerProperties.getReplyToAsString()); + list.add("propertyFlags = " + headerProperties.getPropertyFlags()); + list.add("ApplicationID = " + headerProperties.getAppIdAsString()); + list.add("ClusterID = " + headerProperties.getClusterIdAsString()); + list.add("UserId = " + headerProperties.getUserIdAsString()); + list.add("JMSMessageID = " + headerProperties.getMessageIdAsString()); + list.add("JMSCorrelationID = " + headerProperties.getCorrelationIdAsString()); + + int delMode = headerProperties.getDeliveryMode(); + list.add("JMSDeliveryMode = " + (delMode == 1 ? "Persistent" : "Non_Persistent")); + + list.add("JMSPriority = " + headerProperties.getPriority()); + list.add("JMSType = " + headerProperties.getType()); + + long longDate = headerProperties.getExpiration(); + String strDate = (longDate != 0) ? _dateFormat.format(new Date(longDate)) : null; + list.add("JMSExpiration = " + strDate); + + longDate = headerProperties.getTimestamp(); + strDate = (longDate != 0) ? _dateFormat.format(new Date(longDate)) : null; + list.add("JMSTimestamp = " + strDate); + + return list.toArray(new String[list.size()]); + } + /** * @see ManagedQueue#moveMessages * @param fromMessageId diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java index 879080e10c..cfa13c87fd 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java @@ -401,7 +401,10 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager _lock.lock(); AMQMessage message = _messages.poll(); - _totalMessageSize.addAndGet(-message.getSize()); + if (message != null) + { + _totalMessageSize.addAndGet(-message.getSize()); + } _lock.unlock(); } @@ -539,7 +542,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager { subscriberHasPendingResend(false, sub, null); //better to use the above method as this keeps all the tracking in one location. -// _hasContent.remove(sub); + // _hasContent.remove(sub); } _extraMessages.decrementAndGet(); @@ -552,7 +555,10 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager } } - _totalMessageSize.addAndGet(-message.getSize()); + if ((message != null) && (messageQueue == _messages)) + { + _totalMessageSize.addAndGet(-message.getSize()); + } } catch (AMQException e) { diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ManagedQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ManagedQueue.java index 9b926be82d..061ab56024 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/ManagedQueue.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ManagedQueue.java @@ -156,7 +156,7 @@ public interface ManagedQueue * @return maximum muber of message allowed to be stored in the queue. * @throws IOException */ - Integer getMaximumMessageCount() throws IOException; + Long getMaximumMessageCount() throws IOException; /** * Sets the maximum number of messages allowed to be stored in the queue. @@ -164,7 +164,7 @@ public interface ManagedQueue * @throws IOException */ @MBeanAttribute(name="MaximumMessageCount", description="Threshold high value for number of undelivered messages in the queue") - void setMaximumMessageCount(Integer value) throws IOException; + void setMaximumMessageCount(Long value) throws IOException; /** * This is useful for setting notifications or taking required action if the size of messages diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java b/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java index bc8e1232a7..00ccffdea1 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java @@ -27,7 +27,7 @@ public enum NotificationCheck boolean notifyIfNecessary(AMQMessage msg, AMQQueue queue, QueueNotificationListener listener)
{
int msgCount = queue.getMessageCount();
- final int maximumMessageCount = queue.getMaximumMessageCount();
+ final long maximumMessageCount = queue.getMaximumMessageCount();
if (maximumMessageCount!= 0 && msgCount >= maximumMessageCount)
{
listener.notifyClients(this, queue, msgCount + ": Maximum count on queue threshold ("+ maximumMessageCount +") breached.");
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java index 1eb3506720..236291968f 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java @@ -42,7 +42,7 @@ import java.util.HashSet; /** This class tests all the alerts an AMQQueue can throw based on threshold values of different parameters */ public class AMQQueueAlertTest extends TestCase { - private final static int MAX_MESSAGE_COUNT = 50; + private final static long MAX_MESSAGE_COUNT = 50; private final static long MAX_MESSAGE_AGE = 250; // 0.25 sec private final static long MAX_MESSAGE_SIZE = 2000; // 2 KB private final static long MAX_QUEUE_DEPTH = 10000; // 10 KB @@ -175,7 +175,7 @@ public class AMQQueueAlertTest extends TestCase new AMQShortString("consumer_tag"), true, null, false, false); _queueMBean = (AMQQueueMBean) _queue.getManagedObject(); - _queueMBean.setMaximumMessageCount(9999); // Set a high value, because this is not being tested + _queueMBean.setMaximumMessageCount(9999l); // Set a high value, because this is not being tested _queueMBean.setMaximumQueueDepth(MAX_QUEUE_DEPTH); // Send messages(no of message to be little more than what can cause a Queue_Depth alert) @@ -268,9 +268,9 @@ public class AMQQueueAlertTest extends TestCase _virtualHost = applicationRegistry.getVirtualHostRegistry().getVirtualHost("test"); } - private void sendMessages(int messageCount, long size) throws AMQException + private void sendMessages(long messageCount, long size) throws AMQException { - AMQMessage[] messages = new AMQMessage[messageCount]; + AMQMessage[] messages = new AMQMessage[(int)messageCount]; for (int i = 0; i < messages.length; i++) { messages[i] = message(false, size); diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java index 182c6a2d01..551eb8f0a0 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java @@ -116,7 +116,7 @@ public class AMQQueueMBeanTest extends TestCase public void testGeneralProperties() { long maxQueueDepth = 1000; // in bytes - _queueMBean.setMaximumMessageCount(50000); + _queueMBean.setMaximumMessageCount(50000l); _queueMBean.setMaximumMessageSize(2000l); _queueMBean.setMaximumQueueDepth(maxQueueDepth); |
