diff options
| author | Aidan Skinner <aidan@apache.org> | 2008-05-06 09:26:37 +0000 |
|---|---|---|
| committer | Aidan Skinner <aidan@apache.org> | 2008-05-06 09:26:37 +0000 |
| commit | 62c72d3efd475f1f9b3e82deb05fdedf0223e498 (patch) | |
| tree | bc3d82e87d262a0a0e5eebded394396142f8be51 /java | |
| parent | 187caa499197542fadc01daf2ce48c8d7dce470f (diff) | |
| download | qpid-python-62c72d3efd475f1f9b3e82deb05fdedf0223e498.tar.gz | |
Merged revisions 652388-652389,652399,652567-652568,653416 via svnmerge from
https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.x
........
r652388 | ritchiem | 2008-04-30 15:40:18 +0100 (Wed, 30 Apr 2008) | 2 lines
QPID-889 : Removed _reapingStoreContext from CSDM replaced with local StoreContext()s so they are not reused by different threads.
........
r652389 | ritchiem | 2008-04-30 15:40:45 +0100 (Wed, 30 Apr 2008) | 1 line
QPID-887 : Renamed QueueHouseKeeping threads so they can be identified in thread dump. Named Queue-housekeeping-<virtualhost name>
........
r652399 | ritchiem | 2008-04-30 16:32:42 +0100 (Wed, 30 Apr 2008) | 1 line
QPID-888,QPID-886 : Fixed all management uses of _lock.lock / _lock.unlock so that they correctly call unlock from a finally block in the CSDM. There are two issues that cover that. QPID-888 - Fix the management ones and QPID-886 to fix the use in removeExpired.
........
r652567 | aidan | 2008-05-01 17:32:20 +0100 (Thu, 01 May 2008) | 1 line
QPID-994 Dont wait for attain state as connection is closed by we get CloseOk
........
r652568 | aidan | 2008-05-01 17:35:09 +0100 (Thu, 01 May 2008) | 1 line
QPID-1001 dont set the expiration time if TTL is 0
........
r653416 | aidan | 2008-05-05 11:24:50 +0100 (Mon, 05 May 2008) | 1 line
QPID-1019 prevent messages being dequeued unecessarily, from rgodfrey
........
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@653720 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
4 files changed, 130 insertions, 75 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index 74169a19bb..1314b2b715 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -469,7 +469,7 @@ public class AMQChannel synchronized (_unacknowledgedMessageMap.getLock()) { - _unacknowledgedMessageMap.add(deliveryTag, new UnacknowledgedMessage(entry, consumerTag, deliveryTag)); + _unacknowledgedMessageMap.add(deliveryTag, new UnacknowledgedMessage(entry, consumerTag, deliveryTag,_unacknowledgedMessageMap)); checkSuspension(); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java b/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java index df7cecc940..0112d3b388 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java +++ b/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java @@ -34,13 +34,18 @@ public class UnacknowledgedMessage public final long deliveryTag; private boolean _queueDeleted; + private final UnacknowledgedMessageMap _unacknowledgeMessageMap; - public UnacknowledgedMessage(QueueEntry entry, AMQShortString consumerTag, long deliveryTag) + public UnacknowledgedMessage(QueueEntry entry, + AMQShortString consumerTag, + long deliveryTag, + final UnacknowledgedMessageMap unacknowledgedMessageMap) { this.entry = entry; this.consumerTag = consumerTag; this.deliveryTag = deliveryTag; + _unacknowledgeMessageMap = unacknowledgedMessageMap; } public String toString() @@ -60,12 +65,20 @@ public class UnacknowledgedMessage public void discard(StoreContext storeContext) throws AMQException { - if (entry.getQueue() != null) + synchronized(_unacknowledgeMessageMap) { - entry.getQueue().dequeue(storeContext, entry); + if(_unacknowledgeMessageMap.contains(deliveryTag)) + { + + if (entry.getQueue() != null) + { + entry.getQueue().dequeue(storeContext, entry); + } + //if the queue is null then the message is waiting to be acked, but has been removed. + entry.getMessage().decrementReference(storeContext); + } } - //if the queue is null then the message is waiting to be acked, but has been removed. - entry.getMessage().decrementReference(storeContext); + } public AMQMessage getMessage() diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java index 7dfcae95c3..cf607548f8 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java @@ -87,10 +87,6 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager private final Object _queueHeadLock = new Object(); private String _processingThreadName = ""; - - /** Used by any reaping thread to purge messages */ - private StoreContext _reapingStoreContext = new StoreContext(); - ConcurrentSelectorDeliveryManager(SubscriptionManager subscriptions, AMQQueue queue) { @@ -218,22 +214,32 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager public void removeExpired() throws AMQException { _lock.lock(); - - - for(Iterator<QueueEntry> iter = _messages.iterator(); iter.hasNext();) + try { - QueueEntry entry = iter.next(); - if(entry.expired()) + // New Context to for dealing with the MessageStore. + StoreContext context = new StoreContext(); + + for(Iterator<QueueEntry> iter = _messages.iterator(); iter.hasNext();) { - // fixme: Currently we have to update the total byte size here for the data in the queue - _totalMessageSize.addAndGet(-entry.getSize()); - _queue.dequeue(_reapingStoreContext,entry); - iter.remove(); - } - } + QueueEntry entry = iter.next(); + if(entry.expired()) + { + // fixme: Currently we have to update the total byte size here for the data in the queue + _totalMessageSize.addAndGet(-entry.getSize()); + // Remove the message from the queue in the MessageStore + _queue.dequeue(context,entry); - _lock.unlock(); + // This queue nolonger needs a reference to this message + entry.getMessage().decrementReference(context); + iter.remove(); + } + } + } + finally + { + _lock.unlock(); + } } /** @return the state of the async processor. */ @@ -249,14 +255,20 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager */ public List<QueueEntry> getMessages() { - _lock.lock(); - List<QueueEntry> list = new ArrayList<QueueEntry>(); + List<QueueEntry> list = new ArrayList<QueueEntry>(); - for (QueueEntry entry : _messages) + _lock.lock(); + try { - list.add(entry); + for (QueueEntry entry : _messages) + { + list.add(entry); + } + } + finally + { + _lock.unlock(); } - _lock.unlock(); return list; } @@ -278,24 +290,28 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager long maxMessageCount = toMessageId - fromMessageId + 1; - _lock.lock(); - List<QueueEntry> foundMessagesList = new ArrayList<QueueEntry>(); - - for (QueueEntry entry : _messages) + _lock.lock(); + try { - long msgId = entry.getMessage().getMessageId(); - if (msgId >= fromMessageId && msgId <= toMessageId) + for (QueueEntry entry : _messages) { - foundMessagesList.add(entry); - } - // break if the no of messages are found - if (foundMessagesList.size() == maxMessageCount) - { - break; + long msgId = entry.getMessage().getMessageId(); + if (msgId >= fromMessageId && msgId <= toMessageId) + { + foundMessagesList.add(entry); + } + // break if the no of messages are found + if (foundMessagesList.size() == maxMessageCount) + { + break; + } } } - _lock.unlock(); + finally + { + _lock.unlock(); + } return foundMessagesList; } @@ -445,45 +461,62 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager { _lock.lock(); - QueueEntry entry = _messages.poll(); - - if (entry != null) + try { - queue.dequeue(storeContext, entry); + QueueEntry entry = _messages.poll(); - _totalMessageSize.addAndGet(-entry.getSize()); + if (entry != null) + { + queue.dequeue(storeContext, entry); - //If this causes ref count to hit zero then data will be purged so message.getSize() will NPE. - entry.getMessage().decrementReference(storeContext); + _totalMessageSize.addAndGet(-entry.getSize()); - } + //If this causes ref count to hit zero then data will be purged so message.getSize() will NPE. + entry.getMessage().decrementReference(storeContext); - _lock.unlock(); + } + } + finally + { + _lock.unlock(); + } } public long clearAllMessages(StoreContext storeContext) throws AMQException { long count = 0; - _lock.lock(); - synchronized (_queueHeadLock) + _lock.lock(); + try { - QueueEntry entry = getNextMessage(); - while (entry != null) + synchronized (_queueHeadLock) { - //and remove it - _messages.poll(); + QueueEntry entry = getNextMessage(); - _queue.dequeue(storeContext, entry); + // todo: note: why do we need this? Why not reuse the passed 'storeContext' + //Create a new StoreContext for decrementing the References + StoreContext context = new StoreContext(); + + while (entry != null) + { + //and remove it + _messages.poll(); - entry.getMessage().decrementReference(_reapingStoreContext); + // todo: NOTE: Why is this a different context to the new local 'context'? + _queue.dequeue(storeContext, entry); - entry = getNextMessage(); - count++; + entry.getMessage().decrementReference(context); + + entry = getNextMessage(); + count++; + } + _totalMessageSize.set(0L); } - _totalMessageSize.set(0L); } - _lock.unlock(); + finally + { + _lock.unlock(); + } return count; } @@ -518,10 +551,13 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager { _totalMessageSize.addAndGet(-entry.getSize()); + // New Store Context for removing expired messages + StoreContext storeContext = new StoreContext(); + // Use the reapingStoreContext as any sub(if we have one) may be in a tx. - _queue.dequeue(_reapingStoreContext, entry); + _queue.dequeue(storeContext, entry); - message.decrementReference(_reapingStoreContext); + message.decrementReference(storeContext); if (_log.isInfoEnabled()) { @@ -760,24 +796,30 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager public void enqueueMovedMessages(StoreContext storeContext, List<QueueEntry> movedMessageList) { _lock.lock(); - for (QueueEntry entry : movedMessageList) - { - addMessageToQueue(entry, false); - } - - // enqueue on the pre delivery queues - for (Subscription sub : _subscriptions.getSubscriptions()) + try { for (QueueEntry entry : movedMessageList) { - // Only give the message to those that want them. - if (sub.hasInterest(entry)) + addMessageToQueue(entry, false); + } + + // enqueue on the pre delivery queues + for (Subscription sub : _subscriptions.getSubscriptions()) + { + for (QueueEntry entry : movedMessageList) { - sub.enqueueForPreDelivery(entry, true); + // Only give the message to those that want them. + if (sub.hasInterest(entry)) + { + sub.enqueueForPreDelivery(entry, true); + } } } } - _lock.unlock(); + finally + { + _lock.unlock(); + } } /** diff --git a/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java b/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java index 7fd53a64fd..2bb16aff2e 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java +++ b/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java @@ -139,7 +139,7 @@ public class TxAckTest extends TestCase }; TestMessage message = new TestMessage(deliveryTag, i, info, txnContext); - _map.add(deliveryTag, new UnacknowledgedMessage(new QueueEntry(null,message), null, deliveryTag)); + _map.add(deliveryTag, new UnacknowledgedMessage(new QueueEntry(null,message), null, deliveryTag, _map)); } _acked = acked; _unacked = unacked; |
