summaryrefslogtreecommitdiff
path: root/java/broker/src/main
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2008-08-08 12:19:41 +0000
committerMartin Ritchie <ritchiem@apache.org>2008-08-08 12:19:41 +0000
commit1af47c409e67c090b016463798bd4bfea8d0653d (patch)
tree4c585f8b9b9f47ad12de73bc542a1d5a93974726 /java/broker/src/main
parent60705fbd0483520d2721e57162429ba09132579b (diff)
downloadqpid-python-1af47c409e67c090b016463798bd4bfea8d0653d.tar.gz
QPID-1136 : Provided a fix for the leak in UnacknowledgedMessage when acking. Added a new InternalBrokerBaseCase for performing testing on the broker without using the client libraries. This allows for testing closer to AMQP. Merged from M2.1.x
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@683949 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker/src/main')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java5
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java9
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java22
3 files changed, 15 insertions, 21 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java b/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java
index 8e5b631f96..c80a96f967 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java
@@ -28,6 +28,7 @@ import java.util.Map;
import org.apache.qpid.AMQException;
import org.apache.qpid.server.txn.TransactionalContext;
import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.store.StoreContext;
public interface UnacknowledgedMessageMap
{
@@ -55,8 +56,8 @@ public interface UnacknowledgedMessageMap
QueueEntry remove(long deliveryTag);
- void drainTo(Collection<QueueEntry> destination, long deliveryTag) throws AMQException;
-
+ public void drainTo(long deliveryTag, StoreContext storeContext) throws AMQException;
+
Collection<QueueEntry> cancelAllMessages();
void acknowledgeMessage(long deliveryTag, boolean multiple, TransactionalContext txnContext) throws AMQException;
diff --git a/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java b/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java
index 79208ab426..ef48b60bcd 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.server.ack;
+import org.apache.qpid.server.store.StoreContext;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedHashMap;
@@ -160,7 +161,8 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap
}
}
- public void drainTo(Collection<QueueEntry> destination, long deliveryTag) throws AMQException
+ public void drainTo(long deliveryTag, StoreContext storeContext) throws AMQException
+
{
synchronized (_lock)
{
@@ -175,6 +177,10 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap
throw new AMQException("UnacknowledgedMessageMap is out of order:" + unacked.getKey() +
" When deliveryTag is:" + deliveryTag + "ES:" + _map.entrySet().toString());
}
+
+ //Message has been ack so discard it. This will dequeue and decrement the reference.
+ unacked.getValue().discard(storeContext);
+
it.remove();
_unackedSize -= unacked.getValue().getMessage().getSize();
@@ -182,7 +188,6 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap
unacked.getValue().restoreCredit();
- destination.add(unacked.getValue());
if (unacked.getKey() == deliveryTag)
{
break;
diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java b/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
index 18f1836185..03d59d3ab9 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
@@ -154,28 +154,13 @@ public class NonTransactionalContext implements TransactionalContext
throw new AMQException("Multiple ack on delivery tag " + deliveryTag + " not known for channel");
}
- LinkedList<QueueEntry> acked = new LinkedList<QueueEntry>();
- unacknowledgedMessageMap.drainTo(acked, deliveryTag);
- for (QueueEntry msg : acked)
- {
- if (debug)
- {
- _log.debug("Discarding message: " + msg.getMessage().getMessageId());
- }
- if(msg.getMessage().isPersistent())
- {
- beginTranIfNecessary();
- }
-
- //Message has been ack so discard it. This will dequeue and decrement the reference.
- msg.discard(_storeContext);
- }
+ unacknowledgedMessageMap.drainTo(deliveryTag, _storeContext);
}
}
else
{
QueueEntry msg;
- msg = unacknowledgedMessageMap.remove(deliveryTag);
+ msg = unacknowledgedMessageMap.get(deliveryTag);
if (msg == null)
{
@@ -197,6 +182,9 @@ public class NonTransactionalContext implements TransactionalContext
//Message has been ack so discard it. This will dequeue and decrement the reference.
msg.discard(_storeContext);
+ unacknowledgedMessageMap.remove(deliveryTag);
+
+
if (debug)
{
_log.debug("Received non-multiple ack for messaging with delivery tag " + deliveryTag + " msg id " +