summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
authorAidan Skinner <aidan@apache.org>2008-05-06 09:26:37 +0000
committerAidan Skinner <aidan@apache.org>2008-05-06 09:26:37 +0000
commit62c72d3efd475f1f9b3e82deb05fdedf0223e498 (patch)
treebc3d82e87d262a0a0e5eebded394396142f8be51 /java
parent187caa499197542fadc01daf2ce48c8d7dce470f (diff)
downloadqpid-python-62c72d3efd475f1f9b3e82deb05fdedf0223e498.tar.gz
Merged revisions 652388-652389,652399,652567-652568,653416 via svnmerge from
https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.x ........ r652388 | ritchiem | 2008-04-30 15:40:18 +0100 (Wed, 30 Apr 2008) | 2 lines QPID-889 : Removed _reapingStoreContext from CSDM replaced with local StoreContext()s so they are not reused by different threads. ........ r652389 | ritchiem | 2008-04-30 15:40:45 +0100 (Wed, 30 Apr 2008) | 1 line QPID-887 : Renamed QueueHouseKeeping threads so they can be identified in thread dump. Named Queue-housekeeping-<virtualhost name> ........ r652399 | ritchiem | 2008-04-30 16:32:42 +0100 (Wed, 30 Apr 2008) | 1 line QPID-888,QPID-886 : Fixed all management uses of _lock.lock / _lock.unlock so that they correctly call unlock from a finally block in the CSDM. There are two issues that cover that. QPID-888 - Fix the management ones and QPID-886 to fix the use in removeExpired. ........ r652567 | aidan | 2008-05-01 17:32:20 +0100 (Thu, 01 May 2008) | 1 line QPID-994 Dont wait for attain state as connection is closed by we get CloseOk ........ r652568 | aidan | 2008-05-01 17:35:09 +0100 (Thu, 01 May 2008) | 1 line QPID-1001 dont set the expiration time if TTL is 0 ........ r653416 | aidan | 2008-05-05 11:24:50 +0100 (Mon, 05 May 2008) | 1 line QPID-1019 prevent messages being dequeued unecessarily, from rgodfrey ........ git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@653720 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java23
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java178
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java2
4 files changed, 130 insertions, 75 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index 74169a19bb..1314b2b715 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -469,7 +469,7 @@ public class AMQChannel
synchronized (_unacknowledgedMessageMap.getLock())
{
- _unacknowledgedMessageMap.add(deliveryTag, new UnacknowledgedMessage(entry, consumerTag, deliveryTag));
+ _unacknowledgedMessageMap.add(deliveryTag, new UnacknowledgedMessage(entry, consumerTag, deliveryTag,_unacknowledgedMessageMap));
checkSuspension();
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java b/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java
index df7cecc940..0112d3b388 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java
@@ -34,13 +34,18 @@ public class UnacknowledgedMessage
public final long deliveryTag;
private boolean _queueDeleted;
+ private final UnacknowledgedMessageMap _unacknowledgeMessageMap;
- public UnacknowledgedMessage(QueueEntry entry, AMQShortString consumerTag, long deliveryTag)
+ public UnacknowledgedMessage(QueueEntry entry,
+ AMQShortString consumerTag,
+ long deliveryTag,
+ final UnacknowledgedMessageMap unacknowledgedMessageMap)
{
this.entry = entry;
this.consumerTag = consumerTag;
this.deliveryTag = deliveryTag;
+ _unacknowledgeMessageMap = unacknowledgedMessageMap;
}
public String toString()
@@ -60,12 +65,20 @@ public class UnacknowledgedMessage
public void discard(StoreContext storeContext) throws AMQException
{
- if (entry.getQueue() != null)
+ synchronized(_unacknowledgeMessageMap)
{
- entry.getQueue().dequeue(storeContext, entry);
+ if(_unacknowledgeMessageMap.contains(deliveryTag))
+ {
+
+ if (entry.getQueue() != null)
+ {
+ entry.getQueue().dequeue(storeContext, entry);
+ }
+ //if the queue is null then the message is waiting to be acked, but has been removed.
+ entry.getMessage().decrementReference(storeContext);
+ }
}
- //if the queue is null then the message is waiting to be acked, but has been removed.
- entry.getMessage().decrementReference(storeContext);
+
}
public AMQMessage getMessage()
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
index 7dfcae95c3..cf607548f8 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
@@ -87,10 +87,6 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
private final Object _queueHeadLock = new Object();
private String _processingThreadName = "";
-
- /** Used by any reaping thread to purge messages */
- private StoreContext _reapingStoreContext = new StoreContext();
-
ConcurrentSelectorDeliveryManager(SubscriptionManager subscriptions, AMQQueue queue)
{
@@ -218,22 +214,32 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
public void removeExpired() throws AMQException
{
_lock.lock();
-
-
- for(Iterator<QueueEntry> iter = _messages.iterator(); iter.hasNext();)
+ try
{
- QueueEntry entry = iter.next();
- if(entry.expired())
+ // New Context to for dealing with the MessageStore.
+ StoreContext context = new StoreContext();
+
+ for(Iterator<QueueEntry> iter = _messages.iterator(); iter.hasNext();)
{
- // fixme: Currently we have to update the total byte size here for the data in the queue
- _totalMessageSize.addAndGet(-entry.getSize());
- _queue.dequeue(_reapingStoreContext,entry);
- iter.remove();
- }
- }
+ QueueEntry entry = iter.next();
+ if(entry.expired())
+ {
+ // fixme: Currently we have to update the total byte size here for the data in the queue
+ _totalMessageSize.addAndGet(-entry.getSize());
+ // Remove the message from the queue in the MessageStore
+ _queue.dequeue(context,entry);
- _lock.unlock();
+ // This queue nolonger needs a reference to this message
+ entry.getMessage().decrementReference(context);
+ iter.remove();
+ }
+ }
+ }
+ finally
+ {
+ _lock.unlock();
+ }
}
/** @return the state of the async processor. */
@@ -249,14 +255,20 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
*/
public List<QueueEntry> getMessages()
{
- _lock.lock();
- List<QueueEntry> list = new ArrayList<QueueEntry>();
+ List<QueueEntry> list = new ArrayList<QueueEntry>();
- for (QueueEntry entry : _messages)
+ _lock.lock();
+ try
{
- list.add(entry);
+ for (QueueEntry entry : _messages)
+ {
+ list.add(entry);
+ }
+ }
+ finally
+ {
+ _lock.unlock();
}
- _lock.unlock();
return list;
}
@@ -278,24 +290,28 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
long maxMessageCount = toMessageId - fromMessageId + 1;
- _lock.lock();
-
List<QueueEntry> foundMessagesList = new ArrayList<QueueEntry>();
-
- for (QueueEntry entry : _messages)
+ _lock.lock();
+ try
{
- long msgId = entry.getMessage().getMessageId();
- if (msgId >= fromMessageId && msgId <= toMessageId)
+ for (QueueEntry entry : _messages)
{
- foundMessagesList.add(entry);
- }
- // break if the no of messages are found
- if (foundMessagesList.size() == maxMessageCount)
- {
- break;
+ long msgId = entry.getMessage().getMessageId();
+ if (msgId >= fromMessageId && msgId <= toMessageId)
+ {
+ foundMessagesList.add(entry);
+ }
+ // break if the no of messages are found
+ if (foundMessagesList.size() == maxMessageCount)
+ {
+ break;
+ }
}
}
- _lock.unlock();
+ finally
+ {
+ _lock.unlock();
+ }
return foundMessagesList;
}
@@ -445,45 +461,62 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
{
_lock.lock();
- QueueEntry entry = _messages.poll();
-
- if (entry != null)
+ try
{
- queue.dequeue(storeContext, entry);
+ QueueEntry entry = _messages.poll();
- _totalMessageSize.addAndGet(-entry.getSize());
+ if (entry != null)
+ {
+ queue.dequeue(storeContext, entry);
- //If this causes ref count to hit zero then data will be purged so message.getSize() will NPE.
- entry.getMessage().decrementReference(storeContext);
+ _totalMessageSize.addAndGet(-entry.getSize());
- }
+ //If this causes ref count to hit zero then data will be purged so message.getSize() will NPE.
+ entry.getMessage().decrementReference(storeContext);
- _lock.unlock();
+ }
+ }
+ finally
+ {
+ _lock.unlock();
+ }
}
public long clearAllMessages(StoreContext storeContext) throws AMQException
{
long count = 0;
- _lock.lock();
- synchronized (_queueHeadLock)
+ _lock.lock();
+ try
{
- QueueEntry entry = getNextMessage();
- while (entry != null)
+ synchronized (_queueHeadLock)
{
- //and remove it
- _messages.poll();
+ QueueEntry entry = getNextMessage();
- _queue.dequeue(storeContext, entry);
+ // todo: note: why do we need this? Why not reuse the passed 'storeContext'
+ //Create a new StoreContext for decrementing the References
+ StoreContext context = new StoreContext();
+
+ while (entry != null)
+ {
+ //and remove it
+ _messages.poll();
- entry.getMessage().decrementReference(_reapingStoreContext);
+ // todo: NOTE: Why is this a different context to the new local 'context'?
+ _queue.dequeue(storeContext, entry);
- entry = getNextMessage();
- count++;
+ entry.getMessage().decrementReference(context);
+
+ entry = getNextMessage();
+ count++;
+ }
+ _totalMessageSize.set(0L);
}
- _totalMessageSize.set(0L);
}
- _lock.unlock();
+ finally
+ {
+ _lock.unlock();
+ }
return count;
}
@@ -518,10 +551,13 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
{
_totalMessageSize.addAndGet(-entry.getSize());
+ // New Store Context for removing expired messages
+ StoreContext storeContext = new StoreContext();
+
// Use the reapingStoreContext as any sub(if we have one) may be in a tx.
- _queue.dequeue(_reapingStoreContext, entry);
+ _queue.dequeue(storeContext, entry);
- message.decrementReference(_reapingStoreContext);
+ message.decrementReference(storeContext);
if (_log.isInfoEnabled())
{
@@ -760,24 +796,30 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
public void enqueueMovedMessages(StoreContext storeContext, List<QueueEntry> movedMessageList)
{
_lock.lock();
- for (QueueEntry entry : movedMessageList)
- {
- addMessageToQueue(entry, false);
- }
-
- // enqueue on the pre delivery queues
- for (Subscription sub : _subscriptions.getSubscriptions())
+ try
{
for (QueueEntry entry : movedMessageList)
{
- // Only give the message to those that want them.
- if (sub.hasInterest(entry))
+ addMessageToQueue(entry, false);
+ }
+
+ // enqueue on the pre delivery queues
+ for (Subscription sub : _subscriptions.getSubscriptions())
+ {
+ for (QueueEntry entry : movedMessageList)
{
- sub.enqueueForPreDelivery(entry, true);
+ // Only give the message to those that want them.
+ if (sub.hasInterest(entry))
+ {
+ sub.enqueueForPreDelivery(entry, true);
+ }
}
}
}
- _lock.unlock();
+ finally
+ {
+ _lock.unlock();
+ }
}
/**
diff --git a/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java b/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java
index 7fd53a64fd..2bb16aff2e 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java
@@ -139,7 +139,7 @@ public class TxAckTest extends TestCase
};
TestMessage message = new TestMessage(deliveryTag, i, info, txnContext);
- _map.add(deliveryTag, new UnacknowledgedMessage(new QueueEntry(null,message), null, deliveryTag));
+ _map.add(deliveryTag, new UnacknowledgedMessage(new QueueEntry(null,message), null, deliveryTag, _map));
}
_acked = acked;
_unacked = unacked;