summaryrefslogtreecommitdiff
path: root/java/broker
diff options
context:
space:
mode:
authorBhupendra Bhusman Bhardwaj <bhupendrab@apache.org>2007-03-27 09:07:30 +0000
committerBhupendra Bhusman Bhardwaj <bhupendrab@apache.org>2007-03-27 09:07:30 +0000
commitfb14a2042dd5bdae5a5c79b8cd4f1ad87e59bee1 (patch)
tree0a59a872ff849d3056c803836aed199f8a49c1e2 /java/broker
parent4b5818ed830c97978771ffe7be1ff2ac587bd989 (diff)
downloadqpid-python-fb14a2042dd5bdae5a5c79b8cd4f1ad87e59bee1.tar.gz
merged from M2 (r521792:522567) QPID-408 QPID-421 QPID-428
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@522821 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java6
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java41
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java12
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/ManagedQueue.java4
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java2
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java8
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java2
7 files changed, 56 insertions, 19 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
index f17a6fb60a..a418bb8f8a 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
@@ -118,7 +118,7 @@ public class AMQQueue implements Managable, Comparable
/** max allowed number of messages on a queue. */
@Configured(path = "maximumMessageCount", defaultValue = "0")
- public int _maximumMessageCount;
+ public long _maximumMessageCount;
/** max queue depth for the queue */
@Configured(path = "maximumQueueDepth", defaultValue = "0")
@@ -350,12 +350,12 @@ public class AMQQueue implements Managable, Comparable
return _totalMessagesReceived.get();
}
- public int getMaximumMessageCount()
+ public long getMaximumMessageCount()
{
return _maximumMessageCount;
}
- public void setMaximumMessageCount(int value)
+ public void setMaximumMessageCount(long value)
{
_maximumMessageCount = value;
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
index 056fb5fc01..7a32848c44 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
@@ -20,6 +20,8 @@ package org.apache.qpid.server.queue;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
+import java.util.Date;
+import java.text.SimpleDateFormat;
import javax.management.JMException;
import javax.management.MBeanException;
@@ -44,6 +46,7 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.framing.CommonContentHeaderProperties;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.abstraction.ContentChunk;
import org.apache.qpid.server.management.AMQManagedObject;
import org.apache.qpid.server.management.MBeanConstructor;
@@ -58,8 +61,8 @@ import org.apache.qpid.server.store.StoreContext;
@MBeanDescription("Management Interface for AMQQueue")
public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, QueueNotificationListener
{
-
private static final Logger _logger = Logger.getLogger(AMQQueueMBean.class);
+ private static final SimpleDateFormat _dateFormat = new SimpleDateFormat("MM-dd-yy HH:mm:ss.SSS z");
/**
* Since the MBean is not associated with a real channel we can safely create our own store context
@@ -197,12 +200,12 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que
return _queue.getReceivedMessageCount();
}
- public Integer getMaximumMessageCount()
+ public Long getMaximumMessageCount()
{
return _queue.getMaximumMessageCount();
}
- public void setMaximumMessageCount(Integer value)
+ public void setMaximumMessageCount(Long value)
{
_queue.setMaximumMessageCount(value);
}
@@ -370,8 +373,7 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que
AMQMessage msg = list.get(i - 1);
ContentHeaderBody headerBody = msg.getContentHeaderBody();
// Create header attributes list
- CommonContentHeaderProperties headerProperties = (CommonContentHeaderProperties) headerBody.properties;
- String[] headerAttributes = headerProperties.toString().split(",");
+ String[] headerAttributes = getMessageHeaderProperties(headerBody);
Object[] itemValues = {msg.getMessageId(), headerAttributes, headerBody.bodySize, msg.isRedelivered()};
CompositeData messageData = new CompositeDataSupport(_messageDataType, _msgAttributeNames, itemValues);
_messageList.put(messageData);
@@ -385,6 +387,35 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que
return _messageList;
}
+ private String[] getMessageHeaderProperties(ContentHeaderBody headerBody)
+ {
+ List<String> list = new ArrayList<String>();
+ BasicContentHeaderProperties headerProperties = (BasicContentHeaderProperties) headerBody.properties;
+ list.add("reply-to = " + headerProperties.getReplyToAsString());
+ list.add("propertyFlags = " + headerProperties.getPropertyFlags());
+ list.add("ApplicationID = " + headerProperties.getAppIdAsString());
+ list.add("ClusterID = " + headerProperties.getClusterIdAsString());
+ list.add("UserId = " + headerProperties.getUserIdAsString());
+ list.add("JMSMessageID = " + headerProperties.getMessageIdAsString());
+ list.add("JMSCorrelationID = " + headerProperties.getCorrelationIdAsString());
+
+ int delMode = headerProperties.getDeliveryMode();
+ list.add("JMSDeliveryMode = " + (delMode == 1 ? "Persistent" : "Non_Persistent"));
+
+ list.add("JMSPriority = " + headerProperties.getPriority());
+ list.add("JMSType = " + headerProperties.getType());
+
+ long longDate = headerProperties.getExpiration();
+ String strDate = (longDate != 0) ? _dateFormat.format(new Date(longDate)) : null;
+ list.add("JMSExpiration = " + strDate);
+
+ longDate = headerProperties.getTimestamp();
+ strDate = (longDate != 0) ? _dateFormat.format(new Date(longDate)) : null;
+ list.add("JMSTimestamp = " + strDate);
+
+ return list.toArray(new String[list.size()]);
+ }
+
/**
* @see ManagedQueue#moveMessages
* @param fromMessageId
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
index 879080e10c..cfa13c87fd 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
@@ -401,7 +401,10 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
_lock.lock();
AMQMessage message = _messages.poll();
- _totalMessageSize.addAndGet(-message.getSize());
+ if (message != null)
+ {
+ _totalMessageSize.addAndGet(-message.getSize());
+ }
_lock.unlock();
}
@@ -539,7 +542,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
{
subscriberHasPendingResend(false, sub, null);
//better to use the above method as this keeps all the tracking in one location.
-// _hasContent.remove(sub);
+ // _hasContent.remove(sub);
}
_extraMessages.decrementAndGet();
@@ -552,7 +555,10 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
}
}
- _totalMessageSize.addAndGet(-message.getSize());
+ if ((message != null) && (messageQueue == _messages))
+ {
+ _totalMessageSize.addAndGet(-message.getSize());
+ }
}
catch (AMQException e)
{
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ManagedQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ManagedQueue.java
index 9b926be82d..061ab56024 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/ManagedQueue.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ManagedQueue.java
@@ -156,7 +156,7 @@ public interface ManagedQueue
* @return maximum muber of message allowed to be stored in the queue.
* @throws IOException
*/
- Integer getMaximumMessageCount() throws IOException;
+ Long getMaximumMessageCount() throws IOException;
/**
* Sets the maximum number of messages allowed to be stored in the queue.
@@ -164,7 +164,7 @@ public interface ManagedQueue
* @throws IOException
*/
@MBeanAttribute(name="MaximumMessageCount", description="Threshold high value for number of undelivered messages in the queue")
- void setMaximumMessageCount(Integer value) throws IOException;
+ void setMaximumMessageCount(Long value) throws IOException;
/**
* This is useful for setting notifications or taking required action if the size of messages
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java b/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java
index bc8e1232a7..00ccffdea1 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java
@@ -27,7 +27,7 @@ public enum NotificationCheck
boolean notifyIfNecessary(AMQMessage msg, AMQQueue queue, QueueNotificationListener listener)
{
int msgCount = queue.getMessageCount();
- final int maximumMessageCount = queue.getMaximumMessageCount();
+ final long maximumMessageCount = queue.getMaximumMessageCount();
if (maximumMessageCount!= 0 && msgCount >= maximumMessageCount)
{
listener.notifyClients(this, queue, msgCount + ": Maximum count on queue threshold ("+ maximumMessageCount +") breached.");
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
index 1eb3506720..236291968f 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
@@ -42,7 +42,7 @@ import java.util.HashSet;
/** This class tests all the alerts an AMQQueue can throw based on threshold values of different parameters */
public class AMQQueueAlertTest extends TestCase
{
- private final static int MAX_MESSAGE_COUNT = 50;
+ private final static long MAX_MESSAGE_COUNT = 50;
private final static long MAX_MESSAGE_AGE = 250; // 0.25 sec
private final static long MAX_MESSAGE_SIZE = 2000; // 2 KB
private final static long MAX_QUEUE_DEPTH = 10000; // 10 KB
@@ -175,7 +175,7 @@ public class AMQQueueAlertTest extends TestCase
new AMQShortString("consumer_tag"), true, null, false, false);
_queueMBean = (AMQQueueMBean) _queue.getManagedObject();
- _queueMBean.setMaximumMessageCount(9999); // Set a high value, because this is not being tested
+ _queueMBean.setMaximumMessageCount(9999l); // Set a high value, because this is not being tested
_queueMBean.setMaximumQueueDepth(MAX_QUEUE_DEPTH);
// Send messages(no of message to be little more than what can cause a Queue_Depth alert)
@@ -268,9 +268,9 @@ public class AMQQueueAlertTest extends TestCase
_virtualHost = applicationRegistry.getVirtualHostRegistry().getVirtualHost("test");
}
- private void sendMessages(int messageCount, long size) throws AMQException
+ private void sendMessages(long messageCount, long size) throws AMQException
{
- AMQMessage[] messages = new AMQMessage[messageCount];
+ AMQMessage[] messages = new AMQMessage[(int)messageCount];
for (int i = 0; i < messages.length; i++)
{
messages[i] = message(false, size);
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
index 182c6a2d01..551eb8f0a0 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
@@ -116,7 +116,7 @@ public class AMQQueueMBeanTest extends TestCase
public void testGeneralProperties()
{
long maxQueueDepth = 1000; // in bytes
- _queueMBean.setMaximumMessageCount(50000);
+ _queueMBean.setMaximumMessageCount(50000l);
_queueMBean.setMaximumMessageSize(2000l);
_queueMBean.setMaximumQueueDepth(maxQueueDepth);