summaryrefslogtreecommitdiff
path: root/java/broker/src
diff options
context:
space:
mode:
authorRobert Greig <rgreig@apache.org>2007-03-15 15:39:39 +0000
committerRobert Greig <rgreig@apache.org>2007-03-15 15:39:39 +0000
commit2221875e2742c75ee5728e4a458740e5ff98cc97 (patch)
tree77bdf1f321ced6ee4698535c8b2a8de14d6e8238 /java/broker/src
parentbbc2f087fb974cd976c423bdbdd9ca6123d06ccb (diff)
downloadqpid-python-2221875e2742c75ee5728e4a458740e5ff98cc97.tar.gz
Short pause to help ensure connection.close comes after last message ack, added to PropertyValueTest
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@518667 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker/src')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java8
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java23
3 files changed, 22 insertions, 11 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java b/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java
index fb16267d97..ff933d3c0b 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java
@@ -36,12 +36,14 @@ public abstract class RequiredDeliveryException extends AMQException
public RequiredDeliveryException(String message, AMQMessage payload)
{
super(message);
- _amqMessage = payload;
+
// Increment the reference as this message is in the routing phase
// and so will have the ref decremented as routing fails.
// we need to keep this message around so we can return it in the
- // handler. So increment here.
- payload.incrementReference();
+ // handler. So increment here.
+ _amqMessage = payload.takeReference();
+
+ //payload.incrementReference();
}
public AMQMessage getAMQMessage()
diff --git a/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java b/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java
index aac9408247..5ca8d57f7c 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java
@@ -116,7 +116,7 @@ public class TxAck implements TxnOp
for (UnacknowledgedMessage msg : _unacked)
{
msg.clearTransientMessageData();
- msg.message.incrementReference();
+ msg.message.takeReference();
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
index cdf316f2d7..d6962d28cd 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
@@ -355,15 +355,22 @@ public class AMQMessage
return _messageId;
}
+ /**
+ * Creates a long-lived reference to this message, and increments the count of such references, as an atomic operation.
+ */
+ public AMQMessage takeReference()
+ {
+ _referenceCount.incrementAndGet();
+ return this;
+ }
+
/** Threadsafe. Increment the reference count on the message. */
- public void incrementReference()
+ protected void incrementReference()
{
_referenceCount.incrementAndGet();
if (_log.isDebugEnabled())
{
-
_log.debug("Ref count on message " + debugIdentity() + " incremented " + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 6));
-
}
}
@@ -377,11 +384,13 @@ public class AMQMessage
*/
public void decrementReference(StoreContext storeContext) throws MessageCleanupException
{
+ int count = _referenceCount.decrementAndGet();
+
// note that the operation of decrementing the reference count and then removing the message does not
// have to be atomic since the ref count starts at 1 and the exchange itself decrements that after
// the message has been passed to all queues. i.e. we are
// not relying on the all the increments having taken place before the delivery manager decrements.
- if (_referenceCount.decrementAndGet() == 0)
+ if (count == 0)
{
try
{
@@ -408,13 +417,13 @@ public class AMQMessage
{
if (_log.isDebugEnabled())
{
- _log.debug("Decremented ref count is now " + _referenceCount + " for message id " + debugIdentity() + "\n" + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 5));
- if (_referenceCount.get() < 0)
+ _log.debug("Decremented ref count is now " + count + " for message id " + debugIdentity() + "\n" + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 5));
+ if (count < 0)
{
Thread.dumpStack();
}
}
- if (_referenceCount.get() < 0)
+ if (count < 0)
{
throw new MessageCleanupException("Reference count for message id " + debugIdentity() + " has gone below 0.");
}