diff options
| author | Bhupendra Bhusman Bhardwaj <bhupendrab@apache.org> | 2007-03-15 15:44:01 +0000 |
|---|---|---|
| committer | Bhupendra Bhusman Bhardwaj <bhupendrab@apache.org> | 2007-03-15 15:44:01 +0000 |
| commit | d2d92311eb4208cb24959f6caf64fbe9f3cf464f (patch) | |
| tree | 43d91ad1a27bedfb7cd19790f5870c193202dec0 /java/broker/src | |
| parent | 2221875e2742c75ee5728e4a458740e5ff98cc97 (diff) | |
| download | qpid-python-d2d92311eb4208cb24959f6caf64fbe9f3cf464f.tar.gz | |
- DeliveryManager.getMessage is reimplmented because the ConcurrentLinkedMessageQueueAtomicSize.toArray is not implemented
- Not creating lock while doing startMovingMessages.Just setting movingMessage to true, because that can stop the sync delivery.
- And some tidy up of the code.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@518669 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker/src')
3 files changed, 68 insertions, 35 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java index 78f144703b..f17a6fb60a 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java @@ -235,37 +235,40 @@ public class AMQQueue implements Managable, Comparable return _deliveryMgr.getMessages(); } + /** + * Returns messages within the given range of message Ids + * @param fromMessageId + * @param toMessageId + * @return List of messages + */ + public List<AMQMessage> getMessagesOnTheQueue(long fromMessageId, long toMessageId) + { + return _deliveryMgr.getMessages(fromMessageId, toMessageId); + } + public long getQueueDepth() { return _deliveryMgr.getTotalMessageSize(); } - /** * @param messageId - * * @return AMQMessage with give id if exists. null if AMQMessage with given id doesn't exist. */ public AMQMessage getMessageOnTheQueue(long messageId) { - List<AMQMessage> list = getMessagesOnTheQueue(); - AMQMessage msg = null; - for (AMQMessage message : list) + List<AMQMessage> list = getMessagesOnTheQueue(messageId, messageId); + if (list == null || list.size() == 0) { - if (message.getMessageId() == messageId) - { - msg = message; - break; - } + return null; } - - return msg; + return list.get(0); } /** * moves messages from this queue to another queue. to do this the approach is following- - setup the queue for - * moving messages (hold the lock and stop the async delivery) - get all the messages available in the given message - * id range - setup the other queue for moving messages (hold the lock and stop the async delivery) - send these + * moving messages (stop the async delivery) - get all the messages available in the given message + * id range - setup the other queue for moving messages (stop the async delivery) - send these * available messages to the other queue (enqueue in other queue) - Once sending to other Queue is successful, * remove messages from this queue - remove locks from both queues and start async delivery * @@ -282,24 +285,7 @@ public class AMQQueue implements Managable, Comparable try { startMovingMessages(); - List<AMQMessage> list = getMessagesOnTheQueue(); - List<AMQMessage> foundMessagesList = new ArrayList<AMQMessage>(); - int maxMessageCountToBeMoved = (int) (toMessageId - fromMessageId + 1); - - // Run this loop till you find all the messages or the list has no more messages - for (AMQMessage message : list) - { - long msgId = message.getMessageId(); - if (msgId >= fromMessageId && msgId <= toMessageId) - { - foundMessagesList.add(message); - } - // break the loop as soon as messages to be removed are found - if (foundMessagesList.size() == maxMessageCountToBeMoved) - { - break; - } - } + List<AMQMessage> foundMessagesList = getMessagesOnTheQueue(fromMessageId, toMessageId); // move messages to another queue anotherQueue.startMovingMessages(); diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java index 067b6b138b..879080e10c 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java @@ -208,15 +208,61 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager } } - + /** + * Returns all the messages in the Queue + * @return List of messages + */ public List<AMQMessage> getMessages() { _lock.lock(); - ArrayList<AMQMessage> list = new ArrayList<AMQMessage>(_messages); + List<AMQMessage> list = new ArrayList<AMQMessage>(); + + for (AMQMessage message : _messages) + { + list.add(message); + } _lock.unlock(); + return list; } + /** + * Returns messages within the range of given messageIds + * @param fromMessageId + * @param toMessageId + * @return + */ + public List<AMQMessage> getMessages(long fromMessageId, long toMessageId) + { + if (fromMessageId <= 0 || toMessageId <= 0) + { + return null; + } + + long maxMessageCount = toMessageId - fromMessageId + 1; + + _lock.lock(); + + List<AMQMessage> foundMessagesList = new ArrayList<AMQMessage>(); + + for (AMQMessage message : _messages) + { + long msgId = message.getMessageId(); + if (msgId >= fromMessageId && msgId <= toMessageId) + { + foundMessagesList.add(message); + } + // break if the no of messages are found + if (foundMessagesList.size() == maxMessageCount) + { + break; + } + } + _lock.unlock(); + + return foundMessagesList; + } + public void populatePreDeliveryQueue(Subscription subscription) { if (_log.isTraceEnabled()) @@ -294,7 +340,6 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager */ public void startMovingMessages() { - _lock.lock(); _movingMessages.set(true); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java index 5b77951dfd..10ba48552c 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java @@ -86,6 +86,8 @@ interface DeliveryManager List<AMQMessage> getMessages(); + List<AMQMessage> getMessages(long fromMessageId, long toMessageId); + void populatePreDeliveryQueue(Subscription subscription); boolean performGet(AMQProtocolSession session, AMQChannel channel, boolean acks) throws AMQException; |
