From d2d92311eb4208cb24959f6caf64fbe9f3cf464f Mon Sep 17 00:00:00 2001 From: Bhupendra Bhusman Bhardwaj Date: Thu, 15 Mar 2007 15:44:01 +0000 Subject: - DeliveryManager.getMessage is reimplmented because the ConcurrentLinkedMessageQueueAtomicSize.toArray is not implemented - Not creating lock while doing startMovingMessages.Just setting movingMessage to true, because that can stop the sync delivery. - And some tidy up of the code. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@518669 13f79535-47bb-0310-9956-ffa450edef68 --- .../org/apache/qpid/server/queue/AMQQueue.java | 50 ++++++++------------- .../queue/ConcurrentSelectorDeliveryManager.java | 51 ++++++++++++++++++++-- .../apache/qpid/server/queue/DeliveryManager.java | 2 + 3 files changed, 68 insertions(+), 35 deletions(-) (limited to 'java/broker/src') diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java index 78f144703b..f17a6fb60a 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java @@ -235,37 +235,40 @@ public class AMQQueue implements Managable, Comparable return _deliveryMgr.getMessages(); } + /** + * Returns messages within the given range of message Ids + * @param fromMessageId + * @param toMessageId + * @return List of messages + */ + public List getMessagesOnTheQueue(long fromMessageId, long toMessageId) + { + return _deliveryMgr.getMessages(fromMessageId, toMessageId); + } + public long getQueueDepth() { return _deliveryMgr.getTotalMessageSize(); } - /** * @param messageId - * * @return AMQMessage with give id if exists. null if AMQMessage with given id doesn't exist. */ public AMQMessage getMessageOnTheQueue(long messageId) { - List list = getMessagesOnTheQueue(); - AMQMessage msg = null; - for (AMQMessage message : list) + List list = getMessagesOnTheQueue(messageId, messageId); + if (list == null || list.size() == 0) { - if (message.getMessageId() == messageId) - { - msg = message; - break; - } + return null; } - - return msg; + return list.get(0); } /** * moves messages from this queue to another queue. to do this the approach is following- - setup the queue for - * moving messages (hold the lock and stop the async delivery) - get all the messages available in the given message - * id range - setup the other queue for moving messages (hold the lock and stop the async delivery) - send these + * moving messages (stop the async delivery) - get all the messages available in the given message + * id range - setup the other queue for moving messages (stop the async delivery) - send these * available messages to the other queue (enqueue in other queue) - Once sending to other Queue is successful, * remove messages from this queue - remove locks from both queues and start async delivery * @@ -282,24 +285,7 @@ public class AMQQueue implements Managable, Comparable try { startMovingMessages(); - List list = getMessagesOnTheQueue(); - List foundMessagesList = new ArrayList(); - int maxMessageCountToBeMoved = (int) (toMessageId - fromMessageId + 1); - - // Run this loop till you find all the messages or the list has no more messages - for (AMQMessage message : list) - { - long msgId = message.getMessageId(); - if (msgId >= fromMessageId && msgId <= toMessageId) - { - foundMessagesList.add(message); - } - // break the loop as soon as messages to be removed are found - if (foundMessagesList.size() == maxMessageCountToBeMoved) - { - break; - } - } + List foundMessagesList = getMessagesOnTheQueue(fromMessageId, toMessageId); // move messages to another queue anotherQueue.startMovingMessages(); diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java index 067b6b138b..879080e10c 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java @@ -208,15 +208,61 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager } } - + /** + * Returns all the messages in the Queue + * @return List of messages + */ public List getMessages() { _lock.lock(); - ArrayList list = new ArrayList(_messages); + List list = new ArrayList(); + + for (AMQMessage message : _messages) + { + list.add(message); + } _lock.unlock(); + return list; } + /** + * Returns messages within the range of given messageIds + * @param fromMessageId + * @param toMessageId + * @return + */ + public List getMessages(long fromMessageId, long toMessageId) + { + if (fromMessageId <= 0 || toMessageId <= 0) + { + return null; + } + + long maxMessageCount = toMessageId - fromMessageId + 1; + + _lock.lock(); + + List foundMessagesList = new ArrayList(); + + for (AMQMessage message : _messages) + { + long msgId = message.getMessageId(); + if (msgId >= fromMessageId && msgId <= toMessageId) + { + foundMessagesList.add(message); + } + // break if the no of messages are found + if (foundMessagesList.size() == maxMessageCount) + { + break; + } + } + _lock.unlock(); + + return foundMessagesList; + } + public void populatePreDeliveryQueue(Subscription subscription) { if (_log.isTraceEnabled()) @@ -294,7 +340,6 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager */ public void startMovingMessages() { - _lock.lock(); _movingMessages.set(true); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java index 5b77951dfd..10ba48552c 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java @@ -86,6 +86,8 @@ interface DeliveryManager List getMessages(); + List getMessages(long fromMessageId, long toMessageId); + void populatePreDeliveryQueue(Subscription subscription); boolean performGet(AMQProtocolSession session, AMQChannel channel, boolean acks) throws AMQException; -- cgit v1.2.1