diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2014-08-07 13:14:06 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2014-08-07 13:14:06 +0000 |
| commit | 400ffb496c30a2a6554af7cf7739db40b497e5c2 (patch) | |
| tree | 153a0795d2cc0f0d32b3dcb630d5f82e1f233b6f /qpid/java/broker-core/src | |
| parent | 87456620af31532eb5af81c0207e7533ae67fb39 (diff) | |
| download | qpid-python-400ffb496c30a2a6554af7cf7739db40b497e5c2.tar.gz | |
QPID-5970 : [Java Broker] Expose "age of oldest message on queue" to management apis
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1616487 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker-core/src')
14 files changed, 278 insertions, 10 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java index 198c0a1cb9..d397dd57b6 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java @@ -161,7 +161,10 @@ public abstract class AbstractServerMessageImpl<X extends AbstractServerMessageI private Reference(final AbstractServerMessageImpl<X, T> message) { _message = message; - _message.incrementReference(); + if(!_message.incrementReference()) + { + throw new MessageDeletedException(message.getMessageNumber()); + } } public X getMessage() diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageDeletedException.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageDeletedException.java new file mode 100644 index 0000000000..daa73720e5 --- /dev/null +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageDeletedException.java @@ -0,0 +1,29 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.message; + +public class MessageDeletedException extends RuntimeException +{ + MessageDeletedException(final long messageNumber) + { + super("The message with id " + messageNumber + " has already been deleted, no new reference can be taken"); + } +} diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java index 4e216925e4..073740f07c 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java @@ -210,4 +210,8 @@ public interface Queue<X extends Queue<X>> extends ConfiguredObject<X> @ManagedStatistic long getUnacknowledgedMessages(); + @ManagedStatistic + long getOldestMessageAge(); + + } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java index c889fa7740..54f3c4de09 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java @@ -57,6 +57,7 @@ import org.apache.qpid.server.logging.LogSubject; import org.apache.qpid.server.logging.messages.QueueMessages; import org.apache.qpid.server.logging.subjects.QueueLogSubject; import org.apache.qpid.server.message.InstanceProperties; +import org.apache.qpid.server.message.MessageDeletedException; import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.message.MessageSource; @@ -1413,15 +1414,53 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> return _totalMessagesReceived.get(); } + @Override public long getOldestMessageArrivalTime() { - QueueEntry entry = getOldestQueueEntry(); - return entry == null ? Long.MAX_VALUE : entry.getMessage().getArrivalTime(); + long oldestMessageArrivalTime = -1l; + + while(oldestMessageArrivalTime == -1l) + { + QueueEntry entry = getEntries().getOldestEntry(); + if (entry != null) + { + ServerMessage message = entry.getMessage(); + + if(message != null) + { + try + { + MessageReference reference = message.newReference(); + try + { + oldestMessageArrivalTime = reference.getMessage().getArrivalTime(); + } + finally + { + reference.release(); + } + + + } + catch (MessageDeletedException e) + { + // ignore - the oldest message was deleted after it was discovered - we need to find the new oldest message + } + } + } + else + { + oldestMessageArrivalTime = 0; + } + } + return oldestMessageArrivalTime; } - protected QueueEntry getOldestQueueEntry() + @Override + public long getOldestMessageAge() { - return getEntries().next(getEntries().getHead()); + long oldestMessageArrivalTime = getOldestMessageArrivalTime(); + return oldestMessageArrivalTime == 0 ? 0 : System.currentTimeMillis() - oldestMessageArrivalTime; } public boolean isDeleted() diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueue.java index 7ee454ff04..552e2122e1 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueue.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueue.java @@ -24,13 +24,14 @@ import org.apache.qpid.server.model.ManagedAttribute; import org.apache.qpid.server.model.ManagedContextDefault; import org.apache.qpid.server.model.ManagedObject; -@ManagedObject( category = false, type="lvq" ) +@ManagedObject( category = false, type= LastValueQueue.LAST_VALUE_QUEUE_TYPE) public interface LastValueQueue<X extends LastValueQueue<X>> extends AMQQueue<X> { String LVQ_KEY = "lvqKey"; @ManagedContextDefault( name = "queue.lvqKey" ) String DEFAULT_LVQ_KEY = "qpid.LVQ_key"; + String LAST_VALUE_QUEUE_TYPE = "lvq"; @ManagedAttribute(defaultValue = "${queue.lvqKey}") String getLvqKey(); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java index d3ab8bec32..ac04912a72 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java @@ -105,7 +105,7 @@ public enum NotificationCheck final long thresholdTime = currentTime - maxMessageAge; final long firstArrivalTime = queue.getOldestMessageArrivalTime(); - if(firstArrivalTime < thresholdTime) + if(firstArrivalTime != 0 && firstArrivalTime < thresholdTime) { long oldestAge = currentTime - firstArrivalTime; String notificationMsg = (oldestAge/1000) + "s : Maximum age on queue threshold ("+(maxMessageAge /1000)+"s) breached."; diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/OrderedQueueEntryList.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/OrderedQueueEntryList.java index 47fd5c2fa3..634aa22928 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/OrderedQueueEntryList.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/OrderedQueueEntryList.java @@ -20,12 +20,12 @@ */ package org.apache.qpid.server.queue; -import org.apache.qpid.server.message.ServerMessage; - import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; +import org.apache.qpid.server.message.ServerMessage; + public abstract class OrderedQueueEntryList implements QueueEntryList { @@ -195,5 +195,10 @@ public abstract class OrderedQueueEntryList implements QueueEntryList return 0; } + @Override + public QueueEntry getOldestEntry() + { + return next(getHead()); + } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java index 076b6c9e73..d8d6a2ff0b 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java @@ -189,6 +189,21 @@ abstract public class PriorityQueueList extends OrderedQueueEntryList { } + + @Override + public QueueEntry getOldestEntry() + { + QueueEntry oldest = null; + for(PriorityQueueEntrySubList subList : _priorityLists) + { + QueueEntry subListOldest = subList.getOldestEntry(); + if(oldest == null || (subListOldest != null && subListOldest.getMessage().getMessageNumber() < oldest.getMessage().getMessageNumber())) + { + oldest = subListOldest; + } + } + return oldest; + } } static class PriorityQueueEntrySubList extends PriorityQueueList diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java index d2687a33fd..28dfc73a27 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java @@ -34,6 +34,8 @@ public interface QueueEntryList QueueEntry getHead(); + QueueEntry getOldestEntry(); + void entryDeleted(QueueEntry queueEntry); int getPriorities(); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java index 56fe0bc8e0..aca6f2097b 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java @@ -23,10 +23,11 @@ package org.apache.qpid.server.queue; import org.apache.qpid.server.model.ManagedAttribute; import org.apache.qpid.server.model.ManagedObject; -@ManagedObject( category = false, type="sorted" ) +@ManagedObject( category = false, type= SortedQueue.SORTED_QUEUE_TYPE) public interface SortedQueue<X extends SortedQueue<X>> extends AMQQueue<X> { String SORT_KEY = "sortKey"; + String SORTED_QUEUE_TYPE = "sorted"; @ManagedAttribute( mandatory = true ) String getSortKey(); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java index efb5438214..a37713c8b8 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java @@ -319,6 +319,26 @@ public class SortedQueueEntryList implements QueueEntryList return _head; } + @Override + public QueueEntry getOldestEntry() + { + QueueEntry oldestEntry = null; + QueueEntryIterator iter = iterator(); + while (iter.advance()) + { + QueueEntry node = iter.getNode(); + if (node != null && !node.isDeleted()) + { + ServerMessage msg = node.getMessage(); + if(msg != null && (oldestEntry == null || oldestEntry.getMessage().getMessageNumber() > msg.getMessageNumber())) + { + oldestEntry = node; + } + } + } + return oldestEntry; + } + protected SortedQueueEntry getRoot() { return _root; diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java index 4cbc9ae57b..0def708fed 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java @@ -36,9 +36,13 @@ import java.util.EnumSet; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.UUID; import org.apache.log4j.Logger; +import org.mockito.ArgumentCaptor; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.server.binding.BindingImpl; @@ -936,6 +940,16 @@ abstract class AbstractQueueTestBase extends QpidTestCase } + public void testOldestMessage() + { + AMQQueue<?> queue = getQueue(); + queue.enqueue(createMessage(1l, (byte)1, Collections.singletonMap("sortKey", (Object) "Z"), 10l), null); + queue.enqueue(createMessage(2l, (byte)4, Collections.singletonMap("sortKey", (Object) "M"), 100l), null); + queue.enqueue(createMessage(3l, (byte)9, Collections.singletonMap("sortKey", (Object) "A"), 1000l), null); + + assertEquals(10l,queue.getOldestMessageArrivalTime()); + } + private long getExpirationOnQueue(final AMQQueue queue, long arrivalTime, long expiration) { final List<QueueEntry> entries = new ArrayList<>(); @@ -1087,6 +1101,48 @@ abstract class AbstractQueueTestBase extends QpidTestCase _arguments = arguments; } + protected ServerMessage createMessage(Long id, byte priority, final Map<String,Object> arguments, long arrivalTime) + { + ServerMessage message = createMessage(id); + + AMQMessageHeader hdr = message.getMessageHeader(); + when(hdr.getPriority()).thenReturn(priority); + when(message.getArrivalTime()).thenReturn(arrivalTime); + when(hdr.getHeaderNames()).thenReturn(arguments.keySet()); + final ArgumentCaptor<String> nameCaptor = ArgumentCaptor.forClass(String.class); + when(hdr.containsHeader(nameCaptor.capture())).thenAnswer(new Answer<Boolean>() + { + @Override + public Boolean answer(final InvocationOnMock invocationOnMock) throws Throwable + { + return arguments.containsKey(nameCaptor.getValue()); + } + }); + + final ArgumentCaptor<Set> namesCaptor = ArgumentCaptor.forClass(Set.class); + when(hdr.containsHeaders(namesCaptor.capture())).thenAnswer(new Answer<Boolean>() + { + @Override + public Boolean answer(final InvocationOnMock invocationOnMock) throws Throwable + { + return arguments.keySet().containsAll(namesCaptor.getValue()); + } + }); + + final ArgumentCaptor<String> nameCaptor2 = ArgumentCaptor.forClass(String.class); + when(hdr.getHeader(nameCaptor2.capture())).thenAnswer(new Answer<Object>() + { + @Override + public Object answer(final InvocationOnMock invocationOnMock) throws Throwable + { + return arguments.get(nameCaptor2.getValue()); + } + }); + + + return message; + + } protected ServerMessage createMessage(Long id) { diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/LastValueQueueTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/LastValueQueueTest.java new file mode 100644 index 0000000000..fc04af416f --- /dev/null +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/LastValueQueueTest.java @@ -0,0 +1,53 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.queue; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import org.apache.qpid.server.model.Queue; + +public class LastValueQueueTest extends AbstractQueueTestBase +{ + @Override + public void setUp() throws Exception + { + Map<String,Object> arguments = new HashMap<>(); + arguments.put(LastValueQueue.LVQ_KEY, "lvqKey"); + arguments.put(Queue.TYPE, LastValueQueue.LAST_VALUE_QUEUE_TYPE); + setArguments(arguments); + + super.setUp(); + } + + + public void testOldestMessage() + { + AMQQueue<?> queue = getQueue(); + queue.enqueue(createMessage(1l, (byte)1, Collections.singletonMap("lvqKey", (Object) "Z"), 10l), null); + assertEquals(10l,queue.getOldestMessageArrivalTime()); + queue.enqueue(createMessage(2l, (byte)4, Collections.singletonMap("lvqKey", (Object) "M"), 100l), null); + assertEquals(10l,queue.getOldestMessageArrivalTime()); + queue.enqueue(createMessage(3l, (byte)9, Collections.singletonMap("lvqKey", (Object) "Z"), 1000l), null); + assertEquals(100l,queue.getOldestMessageArrivalTime()); + } +} diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueTest.java new file mode 100644 index 0000000000..83bc500291 --- /dev/null +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueTest.java @@ -0,0 +1,40 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.queue; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.qpid.server.model.Queue; + +public class SortedQueueTest extends AbstractQueueTestBase +{ + @Override + public void setUp() throws Exception + { + Map<String,Object> arguments = new HashMap<>(); + arguments.put(SortedQueue.SORT_KEY, "sortKey"); + arguments.put(Queue.TYPE, SortedQueue.SORTED_QUEUE_TYPE); + setArguments(arguments); + + super.setUp(); + } +} |
