summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2014-08-07 13:14:06 +0000
committerRobert Godfrey <rgodfrey@apache.org>2014-08-07 13:14:06 +0000
commit400ffb496c30a2a6554af7cf7739db40b497e5c2 (patch)
tree153a0795d2cc0f0d32b3dcb630d5f82e1f233b6f
parent87456620af31532eb5af81c0207e7533ae67fb39 (diff)
downloadqpid-python-400ffb496c30a2a6554af7cf7739db40b497e5c2.tar.gz
QPID-5970 : [Java Broker] Expose "age of oldest message on queue" to management apis
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1616487 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java5
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageDeletedException.java29
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java4
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java47
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueue.java3
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java2
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/OrderedQueueEntryList.java9
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java15
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java2
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java3
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java20
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java56
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/LastValueQueueTest.java53
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueTest.java40
-rw-r--r--qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/MessageContentServlet.java31
-rw-r--r--qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/MessageServlet.java22
-rw-r--r--qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/QueueMBean.java6
-rw-r--r--qpid/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/QueueMBeanTest.java6
-rw-r--r--qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedQueue.java30
-rw-r--r--qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ServerInformation.java9
20 files changed, 355 insertions, 37 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java
index 198c0a1cb9..d397dd57b6 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java
@@ -161,7 +161,10 @@ public abstract class AbstractServerMessageImpl<X extends AbstractServerMessageI
private Reference(final AbstractServerMessageImpl<X, T> message)
{
_message = message;
- _message.incrementReference();
+ if(!_message.incrementReference())
+ {
+ throw new MessageDeletedException(message.getMessageNumber());
+ }
}
public X getMessage()
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageDeletedException.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageDeletedException.java
new file mode 100644
index 0000000000..daa73720e5
--- /dev/null
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageDeletedException.java
@@ -0,0 +1,29 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.message;
+
+public class MessageDeletedException extends RuntimeException
+{
+ MessageDeletedException(final long messageNumber)
+ {
+ super("The message with id " + messageNumber + " has already been deleted, no new reference can be taken");
+ }
+}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
index 4e216925e4..073740f07c 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
@@ -210,4 +210,8 @@ public interface Queue<X extends Queue<X>> extends ConfiguredObject<X>
@ManagedStatistic
long getUnacknowledgedMessages();
+ @ManagedStatistic
+ long getOldestMessageAge();
+
+
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
index c889fa7740..54f3c4de09 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
@@ -57,6 +57,7 @@ import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.logging.messages.QueueMessages;
import org.apache.qpid.server.logging.subjects.QueueLogSubject;
import org.apache.qpid.server.message.InstanceProperties;
+import org.apache.qpid.server.message.MessageDeletedException;
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.MessageSource;
@@ -1413,15 +1414,53 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
return _totalMessagesReceived.get();
}
+ @Override
public long getOldestMessageArrivalTime()
{
- QueueEntry entry = getOldestQueueEntry();
- return entry == null ? Long.MAX_VALUE : entry.getMessage().getArrivalTime();
+ long oldestMessageArrivalTime = -1l;
+
+ while(oldestMessageArrivalTime == -1l)
+ {
+ QueueEntry entry = getEntries().getOldestEntry();
+ if (entry != null)
+ {
+ ServerMessage message = entry.getMessage();
+
+ if(message != null)
+ {
+ try
+ {
+ MessageReference reference = message.newReference();
+ try
+ {
+ oldestMessageArrivalTime = reference.getMessage().getArrivalTime();
+ }
+ finally
+ {
+ reference.release();
+ }
+
+
+ }
+ catch (MessageDeletedException e)
+ {
+ // ignore - the oldest message was deleted after it was discovered - we need to find the new oldest message
+ }
+ }
+ }
+ else
+ {
+ oldestMessageArrivalTime = 0;
+ }
+ }
+ return oldestMessageArrivalTime;
}
- protected QueueEntry getOldestQueueEntry()
+ @Override
+ public long getOldestMessageAge()
{
- return getEntries().next(getEntries().getHead());
+ long oldestMessageArrivalTime = getOldestMessageArrivalTime();
+ return oldestMessageArrivalTime == 0 ? 0 : System.currentTimeMillis() - oldestMessageArrivalTime;
}
public boolean isDeleted()
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueue.java
index 7ee454ff04..552e2122e1 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueue.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueue.java
@@ -24,13 +24,14 @@ import org.apache.qpid.server.model.ManagedAttribute;
import org.apache.qpid.server.model.ManagedContextDefault;
import org.apache.qpid.server.model.ManagedObject;
-@ManagedObject( category = false, type="lvq" )
+@ManagedObject( category = false, type= LastValueQueue.LAST_VALUE_QUEUE_TYPE)
public interface LastValueQueue<X extends LastValueQueue<X>> extends AMQQueue<X>
{
String LVQ_KEY = "lvqKey";
@ManagedContextDefault( name = "queue.lvqKey" )
String DEFAULT_LVQ_KEY = "qpid.LVQ_key";
+ String LAST_VALUE_QUEUE_TYPE = "lvq";
@ManagedAttribute(defaultValue = "${queue.lvqKey}")
String getLvqKey();
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java
index d3ab8bec32..ac04912a72 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java
@@ -105,7 +105,7 @@ public enum NotificationCheck
final long thresholdTime = currentTime - maxMessageAge;
final long firstArrivalTime = queue.getOldestMessageArrivalTime();
- if(firstArrivalTime < thresholdTime)
+ if(firstArrivalTime != 0 && firstArrivalTime < thresholdTime)
{
long oldestAge = currentTime - firstArrivalTime;
String notificationMsg = (oldestAge/1000) + "s : Maximum age on queue threshold ("+(maxMessageAge /1000)+"s) breached.";
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/OrderedQueueEntryList.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/OrderedQueueEntryList.java
index 47fd5c2fa3..634aa22928 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/OrderedQueueEntryList.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/OrderedQueueEntryList.java
@@ -20,12 +20,12 @@
*/
package org.apache.qpid.server.queue;
-import org.apache.qpid.server.message.ServerMessage;
-
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import org.apache.qpid.server.message.ServerMessage;
+
public abstract class OrderedQueueEntryList implements QueueEntryList
{
@@ -195,5 +195,10 @@ public abstract class OrderedQueueEntryList implements QueueEntryList
return 0;
}
+ @Override
+ public QueueEntry getOldestEntry()
+ {
+ return next(getHead());
+ }
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java
index 076b6c9e73..d8d6a2ff0b 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java
@@ -189,6 +189,21 @@ abstract public class PriorityQueueList extends OrderedQueueEntryList
{
}
+
+ @Override
+ public QueueEntry getOldestEntry()
+ {
+ QueueEntry oldest = null;
+ for(PriorityQueueEntrySubList subList : _priorityLists)
+ {
+ QueueEntry subListOldest = subList.getOldestEntry();
+ if(oldest == null || (subListOldest != null && subListOldest.getMessage().getMessageNumber() < oldest.getMessage().getMessageNumber()))
+ {
+ oldest = subListOldest;
+ }
+ }
+ return oldest;
+ }
}
static class PriorityQueueEntrySubList extends PriorityQueueList
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java
index d2687a33fd..28dfc73a27 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java
@@ -34,6 +34,8 @@ public interface QueueEntryList
QueueEntry getHead();
+ QueueEntry getOldestEntry();
+
void entryDeleted(QueueEntry queueEntry);
int getPriorities();
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java
index 56fe0bc8e0..aca6f2097b 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java
@@ -23,10 +23,11 @@ package org.apache.qpid.server.queue;
import org.apache.qpid.server.model.ManagedAttribute;
import org.apache.qpid.server.model.ManagedObject;
-@ManagedObject( category = false, type="sorted" )
+@ManagedObject( category = false, type= SortedQueue.SORTED_QUEUE_TYPE)
public interface SortedQueue<X extends SortedQueue<X>> extends AMQQueue<X>
{
String SORT_KEY = "sortKey";
+ String SORTED_QUEUE_TYPE = "sorted";
@ManagedAttribute( mandatory = true )
String getSortKey();
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java
index efb5438214..a37713c8b8 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java
@@ -319,6 +319,26 @@ public class SortedQueueEntryList implements QueueEntryList
return _head;
}
+ @Override
+ public QueueEntry getOldestEntry()
+ {
+ QueueEntry oldestEntry = null;
+ QueueEntryIterator iter = iterator();
+ while (iter.advance())
+ {
+ QueueEntry node = iter.getNode();
+ if (node != null && !node.isDeleted())
+ {
+ ServerMessage msg = node.getMessage();
+ if(msg != null && (oldestEntry == null || oldestEntry.getMessage().getMessageNumber() > msg.getMessageNumber()))
+ {
+ oldestEntry = node;
+ }
+ }
+ }
+ return oldestEntry;
+ }
+
protected SortedQueueEntry getRoot()
{
return _root;
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
index 4cbc9ae57b..0def708fed 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
@@ -36,9 +36,13 @@ import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.UUID;
import org.apache.log4j.Logger;
+import org.mockito.ArgumentCaptor;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.server.binding.BindingImpl;
@@ -936,6 +940,16 @@ abstract class AbstractQueueTestBase extends QpidTestCase
}
+ public void testOldestMessage()
+ {
+ AMQQueue<?> queue = getQueue();
+ queue.enqueue(createMessage(1l, (byte)1, Collections.singletonMap("sortKey", (Object) "Z"), 10l), null);
+ queue.enqueue(createMessage(2l, (byte)4, Collections.singletonMap("sortKey", (Object) "M"), 100l), null);
+ queue.enqueue(createMessage(3l, (byte)9, Collections.singletonMap("sortKey", (Object) "A"), 1000l), null);
+
+ assertEquals(10l,queue.getOldestMessageArrivalTime());
+ }
+
private long getExpirationOnQueue(final AMQQueue queue, long arrivalTime, long expiration)
{
final List<QueueEntry> entries = new ArrayList<>();
@@ -1087,6 +1101,48 @@ abstract class AbstractQueueTestBase extends QpidTestCase
_arguments = arguments;
}
+ protected ServerMessage createMessage(Long id, byte priority, final Map<String,Object> arguments, long arrivalTime)
+ {
+ ServerMessage message = createMessage(id);
+
+ AMQMessageHeader hdr = message.getMessageHeader();
+ when(hdr.getPriority()).thenReturn(priority);
+ when(message.getArrivalTime()).thenReturn(arrivalTime);
+ when(hdr.getHeaderNames()).thenReturn(arguments.keySet());
+ final ArgumentCaptor<String> nameCaptor = ArgumentCaptor.forClass(String.class);
+ when(hdr.containsHeader(nameCaptor.capture())).thenAnswer(new Answer<Boolean>()
+ {
+ @Override
+ public Boolean answer(final InvocationOnMock invocationOnMock) throws Throwable
+ {
+ return arguments.containsKey(nameCaptor.getValue());
+ }
+ });
+
+ final ArgumentCaptor<Set> namesCaptor = ArgumentCaptor.forClass(Set.class);
+ when(hdr.containsHeaders(namesCaptor.capture())).thenAnswer(new Answer<Boolean>()
+ {
+ @Override
+ public Boolean answer(final InvocationOnMock invocationOnMock) throws Throwable
+ {
+ return arguments.keySet().containsAll(namesCaptor.getValue());
+ }
+ });
+
+ final ArgumentCaptor<String> nameCaptor2 = ArgumentCaptor.forClass(String.class);
+ when(hdr.getHeader(nameCaptor2.capture())).thenAnswer(new Answer<Object>()
+ {
+ @Override
+ public Object answer(final InvocationOnMock invocationOnMock) throws Throwable
+ {
+ return arguments.get(nameCaptor2.getValue());
+ }
+ });
+
+
+ return message;
+
+ }
protected ServerMessage createMessage(Long id)
{
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/LastValueQueueTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/LastValueQueueTest.java
new file mode 100644
index 0000000000..fc04af416f
--- /dev/null
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/LastValueQueueTest.java
@@ -0,0 +1,53 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.qpid.server.model.Queue;
+
+public class LastValueQueueTest extends AbstractQueueTestBase
+{
+ @Override
+ public void setUp() throws Exception
+ {
+ Map<String,Object> arguments = new HashMap<>();
+ arguments.put(LastValueQueue.LVQ_KEY, "lvqKey");
+ arguments.put(Queue.TYPE, LastValueQueue.LAST_VALUE_QUEUE_TYPE);
+ setArguments(arguments);
+
+ super.setUp();
+ }
+
+
+ public void testOldestMessage()
+ {
+ AMQQueue<?> queue = getQueue();
+ queue.enqueue(createMessage(1l, (byte)1, Collections.singletonMap("lvqKey", (Object) "Z"), 10l), null);
+ assertEquals(10l,queue.getOldestMessageArrivalTime());
+ queue.enqueue(createMessage(2l, (byte)4, Collections.singletonMap("lvqKey", (Object) "M"), 100l), null);
+ assertEquals(10l,queue.getOldestMessageArrivalTime());
+ queue.enqueue(createMessage(3l, (byte)9, Collections.singletonMap("lvqKey", (Object) "Z"), 1000l), null);
+ assertEquals(100l,queue.getOldestMessageArrivalTime());
+ }
+}
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueTest.java
new file mode 100644
index 0000000000..83bc500291
--- /dev/null
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueTest.java
@@ -0,0 +1,40 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.qpid.server.model.Queue;
+
+public class SortedQueueTest extends AbstractQueueTestBase
+{
+ @Override
+ public void setUp() throws Exception
+ {
+ Map<String,Object> arguments = new HashMap<>();
+ arguments.put(SortedQueue.SORT_KEY, "sortKey");
+ arguments.put(Queue.TYPE, SortedQueue.SORTED_QUEUE_TYPE);
+ setArguments(arguments);
+
+ super.setUp();
+ }
+}
diff --git a/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/MessageContentServlet.java b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/MessageContentServlet.java
index 074ca865a6..ef33a240a9 100644
--- a/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/MessageContentServlet.java
+++ b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/MessageContentServlet.java
@@ -24,6 +24,7 @@ import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
+import org.apache.qpid.server.message.MessageDeletedException;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.Queue;
@@ -121,15 +122,27 @@ public class MessageContentServlet extends AbstractServlet
{
if(_messageNumber == message.getMessageNumber())
{
- MessageReference reference = message.newReference();
-
- _mimeType = message.getMessageHeader().getMimeType();
- _size = message.getSize();
- _content = new byte[(int)_size];
- _found = true;
- message.getContent(ByteBuffer.wrap(_content),0);
- reference.release();
- return true;
+ try
+ {
+ MessageReference reference = message.newReference();
+ try
+ {
+ _mimeType = message.getMessageHeader().getMimeType();
+ _size = message.getSize();
+ _content = new byte[(int) _size];
+ _found = true;
+ message.getContent(ByteBuffer.wrap(_content), 0);
+ }
+ finally
+ {
+ reference.release();
+ }
+ return true;
+ }
+ catch (MessageDeletedException e)
+ {
+ // ignore - the message was deleted as we tried too look at it, treat as if no message found
+ }
}
}
diff --git a/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/MessageServlet.java b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/MessageServlet.java
index 531ea1e3c1..9866207234 100644
--- a/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/MessageServlet.java
+++ b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/MessageServlet.java
@@ -35,6 +35,7 @@ import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.map.SerializationConfig;
import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.message.AMQMessageHeader;
+import org.apache.qpid.server.message.MessageDeletedException;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.Queue;
@@ -328,10 +329,23 @@ public class MessageServlet extends AbstractServlet
{
if(_messageNumber == message.getMessageNumber())
{
- MessageReference reference = message.newReference();
- _messageObject = convertToObject(entry, true);
- reference.release();
- return true;
+ try
+ {
+ MessageReference reference = message.newReference();
+ try
+ {
+ _messageObject = convertToObject(entry, true);
+ }
+ finally
+ {
+ reference.release();
+ }
+ return true;
+ }
+ catch (MessageDeletedException e)
+ {
+ // ignore - the message has been deleted before we got a chance to look at it
+ }
}
}
return false;
diff --git a/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/QueueMBean.java b/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/QueueMBean.java
index a1fec6f69e..ca092fe6f8 100644
--- a/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/QueueMBean.java
+++ b/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/QueueMBean.java
@@ -700,4 +700,10 @@ public class QueueMBean extends AMQManagedObject implements ManagedQueue, QueueN
{
return _queue.isMessageGroupSharedGroups();
}
+
+ @Override
+ public Long getOldestMessageAge()
+ {
+ return _queue.getOldestMessageAge();
+ }
}
diff --git a/qpid/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/QueueMBeanTest.java b/qpid/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/QueueMBeanTest.java
index 6f31dc6d04..671920c33d 100644
--- a/qpid/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/QueueMBeanTest.java
+++ b/qpid/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/QueueMBeanTest.java
@@ -118,6 +118,12 @@ public class QueueMBeanTest extends QpidTestCase
assertStatistic("consumerCount", 3);
}
+ public void testOldestMessageAge() throws Exception
+ {
+ when(_mockQueue.getOldestMessageAge()).thenReturn(3l);
+ assertStatistic("oldestMessageAge", 3l);
+ }
+
/********** Simple Attributes **********/
public void testGetQueueDescription() throws Exception
diff --git a/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedQueue.java b/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedQueue.java
index dbce388054..4032cb78df 100644
--- a/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedQueue.java
+++ b/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedQueue.java
@@ -20,14 +20,6 @@
*/
package org.apache.qpid.management.common.mbeans;
-import org.apache.qpid.management.common.mbeans.annotations.MBeanAttribute;
-import org.apache.qpid.management.common.mbeans.annotations.MBeanOperation;
-import org.apache.qpid.management.common.mbeans.annotations.MBeanOperationParameter;
-
-import javax.management.JMException;
-import javax.management.MBeanOperationInfo;
-import javax.management.openmbean.CompositeData;
-import javax.management.openmbean.TabularData;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
@@ -35,6 +27,15 @@ import java.util.Collections;
import java.util.HashSet;
import java.util.List;
+import javax.management.JMException;
+import javax.management.MBeanOperationInfo;
+import javax.management.openmbean.CompositeData;
+import javax.management.openmbean.TabularData;
+
+import org.apache.qpid.management.common.mbeans.annotations.MBeanAttribute;
+import org.apache.qpid.management.common.mbeans.annotations.MBeanOperation;
+import org.apache.qpid.management.common.mbeans.annotations.MBeanOperationParameter;
+
/**
* The management interface exposed to allow management of a queue.
* @author Robert J. Greig
@@ -91,6 +92,7 @@ public interface ManagedQueue
static final String ATTR_ALT_EXCHANGE = "AlternateExchange";
static final String ATTR_SHARED_MESSAGE_GROUP = "MessageGroupSharedGroups";
static final String ATTR_MESSAGE_GROUP_KEY = "MessageGroupKey";
+ static final String ATTR_OLDEST_MESSAGE_AGE = "OldestMessageAge";
//All attribute names constant
static final List<String> QUEUE_ATTRIBUTES
@@ -120,7 +122,8 @@ public interface ManagedQueue
ATTR_EXCLUSIVE,
ATTR_ALT_EXCHANGE,
ATTR_SHARED_MESSAGE_GROUP,
- ATTR_MESSAGE_GROUP_KEY
+ ATTR_MESSAGE_GROUP_KEY,
+ ATTR_OLDEST_MESSAGE_AGE
))));
/**
@@ -317,7 +320,7 @@ public interface ManagedQueue
*
* @since Qpid JMX API 1.6
* @return Capacity below which flow resumes in bytes
- * @throws IOExceptionm
+ * @throws IOException
*/
Long getFlowResumeCapacity() throws IOException;
@@ -404,6 +407,13 @@ public interface ManagedQueue
@MBeanAttribute(name="MessageGroupSharedGroups", description="If set indicates that while two messages of the same group cannot be processed by different consumers concurrently, no guarantee is made that subsequent messages are always sent to the same consumer")
boolean isMessageGroupSharedGroups();
+ /**
+ * Gets the arrival time of the oldest message in the queue
+ * @since Qpid JMX API 2.8
+ */
+ @MBeanAttribute(name= ATTR_OLDEST_MESSAGE_AGE, description="the age (in milliseconds since the epoch of the oldest message in the queue (or 0 if the queue is empty)")
+ Long getOldestMessageAge();
+
//********** Operations *****************//
diff --git a/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ServerInformation.java b/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ServerInformation.java
index b1519a27b6..2b51711664 100644
--- a/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ServerInformation.java
+++ b/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ServerInformation.java
@@ -20,11 +20,12 @@
*/
package org.apache.qpid.management.common.mbeans;
-import org.apache.qpid.management.common.mbeans.annotations.MBeanAttribute;
-import org.apache.qpid.management.common.mbeans.annotations.MBeanOperation;
+import java.io.IOException;
import javax.management.MBeanOperationInfo;
-import java.io.IOException;
+
+import org.apache.qpid.management.common.mbeans.annotations.MBeanAttribute;
+import org.apache.qpid.management.common.mbeans.annotations.MBeanOperation;
/**
* Interface for the ServerInformation MBean
@@ -46,7 +47,7 @@ public interface ServerInformation
* Qpid JMX API 1.1 can be assumed.
*/
int QPID_JMX_API_MAJOR_VERSION = 2;
- int QPID_JMX_API_MINOR_VERSION = 7;
+ int QPID_JMX_API_MINOR_VERSION = 8;
/**