diff options
Diffstat (limited to 'java/broker')
3 files changed, 68 insertions, 35 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java index 78f144703b..f17a6fb60a 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java @@ -235,37 +235,40 @@ public class AMQQueue implements Managable, Comparable return _deliveryMgr.getMessages(); } + /** + * Returns messages within the given range of message Ids + * @param fromMessageId + * @param toMessageId + * @return List of messages + */ + public List<AMQMessage> getMessagesOnTheQueue(long fromMessageId, long toMessageId) + { + return _deliveryMgr.getMessages(fromMessageId, toMessageId); + } + public long getQueueDepth() { return _deliveryMgr.getTotalMessageSize(); } - /** * @param messageId - * * @return AMQMessage with give id if exists. null if AMQMessage with given id doesn't exist. */ public AMQMessage getMessageOnTheQueue(long messageId) { - List<AMQMessage> list = getMessagesOnTheQueue(); - AMQMessage msg = null; - for (AMQMessage message : list) + List<AMQMessage> list = getMessagesOnTheQueue(messageId, messageId); + if (list == null || list.size() == 0) { - if (message.getMessageId() == messageId) - { - msg = message; - break; - } + return null; } - - return msg; + return list.get(0); } /** * moves messages from this queue to another queue. to do this the approach is following- - setup the queue for - * moving messages (hold the lock and stop the async delivery) - get all the messages available in the given message - * id range - setup the other queue for moving messages (hold the lock and stop the async delivery) - send these + * moving messages (stop the async delivery) - get all the messages available in the given message + * id range - setup the other queue for moving messages (stop the async delivery) - send these * available messages to the other queue (enqueue in other queue) - Once sending to other Queue is successful, * remove messages from this queue - remove locks from both queues and start async delivery * @@ -282,24 +285,7 @@ public class AMQQueue implements Managable, Comparable try { startMovingMessages(); - List<AMQMessage> list = getMessagesOnTheQueue(); - List<AMQMessage> foundMessagesList = new ArrayList<AMQMessage>(); - int maxMessageCountToBeMoved = (int) (toMessageId - fromMessageId + 1); - - // Run this loop till you find all the messages or the list has no more messages - for (AMQMessage message : list) - { - long msgId = message.getMessageId(); - if (msgId >= fromMessageId && msgId <= toMessageId) - { - foundMessagesList.add(message); - } - // break the loop as soon as messages to be removed are found - if (foundMessagesList.size() == maxMessageCountToBeMoved) - { - break; - } - } + List<AMQMessage> foundMessagesList = getMessagesOnTheQueue(fromMessageId, toMessageId); // move messages to another queue anotherQueue.startMovingMessages(); diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java index 067b6b138b..879080e10c 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java @@ -208,15 +208,61 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager } } - + /** + * Returns all the messages in the Queue + * @return List of messages + */ public List<AMQMessage> getMessages() { _lock.lock(); - ArrayList<AMQMessage> list = new ArrayList<AMQMessage>(_messages); + List<AMQMessage> list = new ArrayList<AMQMessage>(); + + for (AMQMessage message : _messages) + { + list.add(message); + } _lock.unlock(); + return list; } + /** + * Returns messages within the range of given messageIds + * @param fromMessageId + * @param toMessageId + * @return + */ + public List<AMQMessage> getMessages(long fromMessageId, long toMessageId) + { + if (fromMessageId <= 0 || toMessageId <= 0) + { + return null; + } + + long maxMessageCount = toMessageId - fromMessageId + 1; + + _lock.lock(); + + List<AMQMessage> foundMessagesList = new ArrayList<AMQMessage>(); + + for (AMQMessage message : _messages) + { + long msgId = message.getMessageId(); + if (msgId >= fromMessageId && msgId <= toMessageId) + { + foundMessagesList.add(message); + } + // break if the no of messages are found + if (foundMessagesList.size() == maxMessageCount) + { + break; + } + } + _lock.unlock(); + + return foundMessagesList; + } + public void populatePreDeliveryQueue(Subscription subscription) { if (_log.isTraceEnabled()) @@ -294,7 +340,6 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager */ public void startMovingMessages() { - _lock.lock(); _movingMessages.set(true); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java index 5b77951dfd..10ba48552c 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java @@ -86,6 +86,8 @@ interface DeliveryManager List<AMQMessage> getMessages(); + List<AMQMessage> getMessages(long fromMessageId, long toMessageId); + void populatePreDeliveryQueue(Subscription subscription); boolean performGet(AMQProtocolSession session, AMQChannel channel, boolean acks) throws AMQException; |
