summaryrefslogtreecommitdiff
path: root/qpid/java/broker-core/src
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2014-08-07 13:14:06 +0000
committerRobert Godfrey <rgodfrey@apache.org>2014-08-07 13:14:06 +0000
commit400ffb496c30a2a6554af7cf7739db40b497e5c2 (patch)
tree153a0795d2cc0f0d32b3dcb630d5f82e1f233b6f /qpid/java/broker-core/src
parent87456620af31532eb5af81c0207e7533ae67fb39 (diff)
downloadqpid-python-400ffb496c30a2a6554af7cf7739db40b497e5c2.tar.gz
QPID-5970 : [Java Broker] Expose "age of oldest message on queue" to management apis
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1616487 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker-core/src')
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java5
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageDeletedException.java29
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java4
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java47
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueue.java3
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java2
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/OrderedQueueEntryList.java9
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java15
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java2
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java3
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java20
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java56
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/LastValueQueueTest.java53
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueTest.java40
14 files changed, 278 insertions, 10 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java
index 198c0a1cb9..d397dd57b6 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java
@@ -161,7 +161,10 @@ public abstract class AbstractServerMessageImpl<X extends AbstractServerMessageI
private Reference(final AbstractServerMessageImpl<X, T> message)
{
_message = message;
- _message.incrementReference();
+ if(!_message.incrementReference())
+ {
+ throw new MessageDeletedException(message.getMessageNumber());
+ }
}
public X getMessage()
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageDeletedException.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageDeletedException.java
new file mode 100644
index 0000000000..daa73720e5
--- /dev/null
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageDeletedException.java
@@ -0,0 +1,29 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.message;
+
+public class MessageDeletedException extends RuntimeException
+{
+ MessageDeletedException(final long messageNumber)
+ {
+ super("The message with id " + messageNumber + " has already been deleted, no new reference can be taken");
+ }
+}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
index 4e216925e4..073740f07c 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
@@ -210,4 +210,8 @@ public interface Queue<X extends Queue<X>> extends ConfiguredObject<X>
@ManagedStatistic
long getUnacknowledgedMessages();
+ @ManagedStatistic
+ long getOldestMessageAge();
+
+
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
index c889fa7740..54f3c4de09 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
@@ -57,6 +57,7 @@ import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.logging.messages.QueueMessages;
import org.apache.qpid.server.logging.subjects.QueueLogSubject;
import org.apache.qpid.server.message.InstanceProperties;
+import org.apache.qpid.server.message.MessageDeletedException;
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.MessageSource;
@@ -1413,15 +1414,53 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
return _totalMessagesReceived.get();
}
+ @Override
public long getOldestMessageArrivalTime()
{
- QueueEntry entry = getOldestQueueEntry();
- return entry == null ? Long.MAX_VALUE : entry.getMessage().getArrivalTime();
+ long oldestMessageArrivalTime = -1l;
+
+ while(oldestMessageArrivalTime == -1l)
+ {
+ QueueEntry entry = getEntries().getOldestEntry();
+ if (entry != null)
+ {
+ ServerMessage message = entry.getMessage();
+
+ if(message != null)
+ {
+ try
+ {
+ MessageReference reference = message.newReference();
+ try
+ {
+ oldestMessageArrivalTime = reference.getMessage().getArrivalTime();
+ }
+ finally
+ {
+ reference.release();
+ }
+
+
+ }
+ catch (MessageDeletedException e)
+ {
+ // ignore - the oldest message was deleted after it was discovered - we need to find the new oldest message
+ }
+ }
+ }
+ else
+ {
+ oldestMessageArrivalTime = 0;
+ }
+ }
+ return oldestMessageArrivalTime;
}
- protected QueueEntry getOldestQueueEntry()
+ @Override
+ public long getOldestMessageAge()
{
- return getEntries().next(getEntries().getHead());
+ long oldestMessageArrivalTime = getOldestMessageArrivalTime();
+ return oldestMessageArrivalTime == 0 ? 0 : System.currentTimeMillis() - oldestMessageArrivalTime;
}
public boolean isDeleted()
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueue.java
index 7ee454ff04..552e2122e1 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueue.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueue.java
@@ -24,13 +24,14 @@ import org.apache.qpid.server.model.ManagedAttribute;
import org.apache.qpid.server.model.ManagedContextDefault;
import org.apache.qpid.server.model.ManagedObject;
-@ManagedObject( category = false, type="lvq" )
+@ManagedObject( category = false, type= LastValueQueue.LAST_VALUE_QUEUE_TYPE)
public interface LastValueQueue<X extends LastValueQueue<X>> extends AMQQueue<X>
{
String LVQ_KEY = "lvqKey";
@ManagedContextDefault( name = "queue.lvqKey" )
String DEFAULT_LVQ_KEY = "qpid.LVQ_key";
+ String LAST_VALUE_QUEUE_TYPE = "lvq";
@ManagedAttribute(defaultValue = "${queue.lvqKey}")
String getLvqKey();
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java
index d3ab8bec32..ac04912a72 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java
@@ -105,7 +105,7 @@ public enum NotificationCheck
final long thresholdTime = currentTime - maxMessageAge;
final long firstArrivalTime = queue.getOldestMessageArrivalTime();
- if(firstArrivalTime < thresholdTime)
+ if(firstArrivalTime != 0 && firstArrivalTime < thresholdTime)
{
long oldestAge = currentTime - firstArrivalTime;
String notificationMsg = (oldestAge/1000) + "s : Maximum age on queue threshold ("+(maxMessageAge /1000)+"s) breached.";
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/OrderedQueueEntryList.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/OrderedQueueEntryList.java
index 47fd5c2fa3..634aa22928 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/OrderedQueueEntryList.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/OrderedQueueEntryList.java
@@ -20,12 +20,12 @@
*/
package org.apache.qpid.server.queue;
-import org.apache.qpid.server.message.ServerMessage;
-
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import org.apache.qpid.server.message.ServerMessage;
+
public abstract class OrderedQueueEntryList implements QueueEntryList
{
@@ -195,5 +195,10 @@ public abstract class OrderedQueueEntryList implements QueueEntryList
return 0;
}
+ @Override
+ public QueueEntry getOldestEntry()
+ {
+ return next(getHead());
+ }
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java
index 076b6c9e73..d8d6a2ff0b 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java
@@ -189,6 +189,21 @@ abstract public class PriorityQueueList extends OrderedQueueEntryList
{
}
+
+ @Override
+ public QueueEntry getOldestEntry()
+ {
+ QueueEntry oldest = null;
+ for(PriorityQueueEntrySubList subList : _priorityLists)
+ {
+ QueueEntry subListOldest = subList.getOldestEntry();
+ if(oldest == null || (subListOldest != null && subListOldest.getMessage().getMessageNumber() < oldest.getMessage().getMessageNumber()))
+ {
+ oldest = subListOldest;
+ }
+ }
+ return oldest;
+ }
}
static class PriorityQueueEntrySubList extends PriorityQueueList
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java
index d2687a33fd..28dfc73a27 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java
@@ -34,6 +34,8 @@ public interface QueueEntryList
QueueEntry getHead();
+ QueueEntry getOldestEntry();
+
void entryDeleted(QueueEntry queueEntry);
int getPriorities();
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java
index 56fe0bc8e0..aca6f2097b 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java
@@ -23,10 +23,11 @@ package org.apache.qpid.server.queue;
import org.apache.qpid.server.model.ManagedAttribute;
import org.apache.qpid.server.model.ManagedObject;
-@ManagedObject( category = false, type="sorted" )
+@ManagedObject( category = false, type= SortedQueue.SORTED_QUEUE_TYPE)
public interface SortedQueue<X extends SortedQueue<X>> extends AMQQueue<X>
{
String SORT_KEY = "sortKey";
+ String SORTED_QUEUE_TYPE = "sorted";
@ManagedAttribute( mandatory = true )
String getSortKey();
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java
index efb5438214..a37713c8b8 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java
@@ -319,6 +319,26 @@ public class SortedQueueEntryList implements QueueEntryList
return _head;
}
+ @Override
+ public QueueEntry getOldestEntry()
+ {
+ QueueEntry oldestEntry = null;
+ QueueEntryIterator iter = iterator();
+ while (iter.advance())
+ {
+ QueueEntry node = iter.getNode();
+ if (node != null && !node.isDeleted())
+ {
+ ServerMessage msg = node.getMessage();
+ if(msg != null && (oldestEntry == null || oldestEntry.getMessage().getMessageNumber() > msg.getMessageNumber()))
+ {
+ oldestEntry = node;
+ }
+ }
+ }
+ return oldestEntry;
+ }
+
protected SortedQueueEntry getRoot()
{
return _root;
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
index 4cbc9ae57b..0def708fed 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
@@ -36,9 +36,13 @@ import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.UUID;
import org.apache.log4j.Logger;
+import org.mockito.ArgumentCaptor;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.server.binding.BindingImpl;
@@ -936,6 +940,16 @@ abstract class AbstractQueueTestBase extends QpidTestCase
}
+ public void testOldestMessage()
+ {
+ AMQQueue<?> queue = getQueue();
+ queue.enqueue(createMessage(1l, (byte)1, Collections.singletonMap("sortKey", (Object) "Z"), 10l), null);
+ queue.enqueue(createMessage(2l, (byte)4, Collections.singletonMap("sortKey", (Object) "M"), 100l), null);
+ queue.enqueue(createMessage(3l, (byte)9, Collections.singletonMap("sortKey", (Object) "A"), 1000l), null);
+
+ assertEquals(10l,queue.getOldestMessageArrivalTime());
+ }
+
private long getExpirationOnQueue(final AMQQueue queue, long arrivalTime, long expiration)
{
final List<QueueEntry> entries = new ArrayList<>();
@@ -1087,6 +1101,48 @@ abstract class AbstractQueueTestBase extends QpidTestCase
_arguments = arguments;
}
+ protected ServerMessage createMessage(Long id, byte priority, final Map<String,Object> arguments, long arrivalTime)
+ {
+ ServerMessage message = createMessage(id);
+
+ AMQMessageHeader hdr = message.getMessageHeader();
+ when(hdr.getPriority()).thenReturn(priority);
+ when(message.getArrivalTime()).thenReturn(arrivalTime);
+ when(hdr.getHeaderNames()).thenReturn(arguments.keySet());
+ final ArgumentCaptor<String> nameCaptor = ArgumentCaptor.forClass(String.class);
+ when(hdr.containsHeader(nameCaptor.capture())).thenAnswer(new Answer<Boolean>()
+ {
+ @Override
+ public Boolean answer(final InvocationOnMock invocationOnMock) throws Throwable
+ {
+ return arguments.containsKey(nameCaptor.getValue());
+ }
+ });
+
+ final ArgumentCaptor<Set> namesCaptor = ArgumentCaptor.forClass(Set.class);
+ when(hdr.containsHeaders(namesCaptor.capture())).thenAnswer(new Answer<Boolean>()
+ {
+ @Override
+ public Boolean answer(final InvocationOnMock invocationOnMock) throws Throwable
+ {
+ return arguments.keySet().containsAll(namesCaptor.getValue());
+ }
+ });
+
+ final ArgumentCaptor<String> nameCaptor2 = ArgumentCaptor.forClass(String.class);
+ when(hdr.getHeader(nameCaptor2.capture())).thenAnswer(new Answer<Object>()
+ {
+ @Override
+ public Object answer(final InvocationOnMock invocationOnMock) throws Throwable
+ {
+ return arguments.get(nameCaptor2.getValue());
+ }
+ });
+
+
+ return message;
+
+ }
protected ServerMessage createMessage(Long id)
{
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/LastValueQueueTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/LastValueQueueTest.java
new file mode 100644
index 0000000000..fc04af416f
--- /dev/null
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/LastValueQueueTest.java
@@ -0,0 +1,53 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.qpid.server.model.Queue;
+
+public class LastValueQueueTest extends AbstractQueueTestBase
+{
+ @Override
+ public void setUp() throws Exception
+ {
+ Map<String,Object> arguments = new HashMap<>();
+ arguments.put(LastValueQueue.LVQ_KEY, "lvqKey");
+ arguments.put(Queue.TYPE, LastValueQueue.LAST_VALUE_QUEUE_TYPE);
+ setArguments(arguments);
+
+ super.setUp();
+ }
+
+
+ public void testOldestMessage()
+ {
+ AMQQueue<?> queue = getQueue();
+ queue.enqueue(createMessage(1l, (byte)1, Collections.singletonMap("lvqKey", (Object) "Z"), 10l), null);
+ assertEquals(10l,queue.getOldestMessageArrivalTime());
+ queue.enqueue(createMessage(2l, (byte)4, Collections.singletonMap("lvqKey", (Object) "M"), 100l), null);
+ assertEquals(10l,queue.getOldestMessageArrivalTime());
+ queue.enqueue(createMessage(3l, (byte)9, Collections.singletonMap("lvqKey", (Object) "Z"), 1000l), null);
+ assertEquals(100l,queue.getOldestMessageArrivalTime());
+ }
+}
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueTest.java
new file mode 100644
index 0000000000..83bc500291
--- /dev/null
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueTest.java
@@ -0,0 +1,40 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.qpid.server.model.Queue;
+
+public class SortedQueueTest extends AbstractQueueTestBase
+{
+ @Override
+ public void setUp() throws Exception
+ {
+ Map<String,Object> arguments = new HashMap<>();
+ arguments.put(SortedQueue.SORT_KEY, "sortKey");
+ arguments.put(Queue.TYPE, SortedQueue.SORTED_QUEUE_TYPE);
+ setArguments(arguments);
+
+ super.setUp();
+ }
+}