diff options
| author | Martin Ritchie <ritchiem@apache.org> | 2008-08-08 12:19:41 +0000 |
|---|---|---|
| committer | Martin Ritchie <ritchiem@apache.org> | 2008-08-08 12:19:41 +0000 |
| commit | 1af47c409e67c090b016463798bd4bfea8d0653d (patch) | |
| tree | 4c585f8b9b9f47ad12de73bc542a1d5a93974726 /java/broker/src/main | |
| parent | 60705fbd0483520d2721e57162429ba09132579b (diff) | |
| download | qpid-python-1af47c409e67c090b016463798bd4bfea8d0653d.tar.gz | |
QPID-1136 : Provided a fix for the leak in UnacknowledgedMessage when acking. Added a new InternalBrokerBaseCase for performing testing on the broker without using the client libraries. This allows for testing closer to AMQP. Merged from M2.1.x
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@683949 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker/src/main')
3 files changed, 15 insertions, 21 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java b/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java index 8e5b631f96..c80a96f967 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java +++ b/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java @@ -28,6 +28,7 @@ import java.util.Map; import org.apache.qpid.AMQException; import org.apache.qpid.server.txn.TransactionalContext; import org.apache.qpid.server.queue.QueueEntry; +import org.apache.qpid.server.store.StoreContext; public interface UnacknowledgedMessageMap { @@ -55,8 +56,8 @@ public interface UnacknowledgedMessageMap QueueEntry remove(long deliveryTag); - void drainTo(Collection<QueueEntry> destination, long deliveryTag) throws AMQException; - + public void drainTo(long deliveryTag, StoreContext storeContext) throws AMQException; + Collection<QueueEntry> cancelAllMessages(); void acknowledgeMessage(long deliveryTag, boolean multiple, TransactionalContext txnContext) throws AMQException; diff --git a/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java b/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java index 79208ab426..ef48b60bcd 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java +++ b/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java @@ -20,6 +20,7 @@ */ package org.apache.qpid.server.ack; +import org.apache.qpid.server.store.StoreContext; import java.util.Collection; import java.util.Iterator; import java.util.LinkedHashMap; @@ -160,7 +161,8 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap } } - public void drainTo(Collection<QueueEntry> destination, long deliveryTag) throws AMQException + public void drainTo(long deliveryTag, StoreContext storeContext) throws AMQException + { synchronized (_lock) { @@ -175,6 +177,10 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap throw new AMQException("UnacknowledgedMessageMap is out of order:" + unacked.getKey() + " When deliveryTag is:" + deliveryTag + "ES:" + _map.entrySet().toString()); } + + //Message has been ack so discard it. This will dequeue and decrement the reference. + unacked.getValue().discard(storeContext); + it.remove(); _unackedSize -= unacked.getValue().getMessage().getSize(); @@ -182,7 +188,6 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap unacked.getValue().restoreCredit(); - destination.add(unacked.getValue()); if (unacked.getKey() == deliveryTag) { break; diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java b/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java index 18f1836185..03d59d3ab9 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java +++ b/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java @@ -154,28 +154,13 @@ public class NonTransactionalContext implements TransactionalContext throw new AMQException("Multiple ack on delivery tag " + deliveryTag + " not known for channel"); } - LinkedList<QueueEntry> acked = new LinkedList<QueueEntry>(); - unacknowledgedMessageMap.drainTo(acked, deliveryTag); - for (QueueEntry msg : acked) - { - if (debug) - { - _log.debug("Discarding message: " + msg.getMessage().getMessageId()); - } - if(msg.getMessage().isPersistent()) - { - beginTranIfNecessary(); - } - - //Message has been ack so discard it. This will dequeue and decrement the reference. - msg.discard(_storeContext); - } + unacknowledgedMessageMap.drainTo(deliveryTag, _storeContext); } } else { QueueEntry msg; - msg = unacknowledgedMessageMap.remove(deliveryTag); + msg = unacknowledgedMessageMap.get(deliveryTag); if (msg == null) { @@ -197,6 +182,9 @@ public class NonTransactionalContext implements TransactionalContext //Message has been ack so discard it. This will dequeue and decrement the reference. msg.discard(_storeContext); + unacknowledgedMessageMap.remove(deliveryTag); + + if (debug) { _log.debug("Received non-multiple ack for messaging with delivery tag " + deliveryTag + " msg id " + |
