summaryrefslogtreecommitdiff
path: root/java/broker/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'java/broker/src/main')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java14
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java11
2 files changed, 13 insertions, 12 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
index 254348dba0..056fb5fc01 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
@@ -81,8 +81,8 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que
private final static String[] _msgContentAttributes = {"AMQ MessageId", "MimeType", "Encoding", "Content"};
private static OpenType[] _msgContentAttributeTypes = new OpenType[4];
-
private final long[] _lastNotificationTimes = new long[NotificationCheck.values().length];
+ private Notification _lastNotification = null;
@MBeanConstructor("Creates an MBean exposing an AMQQueue")
public AMQQueueMBean(AMQQueue queue) throws JMException
@@ -256,11 +256,17 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que
{
// important : add log to the log file - monitoring tools may be looking for this
_logger.info(notification.name() + " On Queue " + queue.getName() + " - " + notificationMsg);
-
- Notification n = new Notification(MonitorNotification.THRESHOLD_VALUE_EXCEEDED, this,
+ notificationMsg = notification.name() + " " + notificationMsg;
+
+ _lastNotification = new Notification(MonitorNotification.THRESHOLD_VALUE_EXCEEDED, this,
++_notificationSequenceNumber, System.currentTimeMillis(), notificationMsg);
- _broadcaster.sendNotification(n);
+ _broadcaster.sendNotification(_lastNotification);
+ }
+
+ public Notification getLastNotification()
+ {
+ return _lastNotification;
}
/**
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
index 5cf08c857e..87868f0b25 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
@@ -354,14 +354,9 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
public void removeAMessageFromTop(StoreContext storeContext) throws AMQException
{
_lock.lock();
- AMQMessage msg = getNextMessage();
- if (msg != null)
- {
- // mark this message as taken and get it removed
- msg.taken(null);
- _queue.dequeue(storeContext, msg);
- getNextMessage();
- }
+
+ AMQMessage message = _messages.poll();
+ _totalMessageSize.addAndGet(-message.getSize());
_lock.unlock();
}