diff options
| author | Robert Greig <rgreig@apache.org> | 2007-03-15 15:39:39 +0000 |
|---|---|---|
| committer | Robert Greig <rgreig@apache.org> | 2007-03-15 15:39:39 +0000 |
| commit | 2221875e2742c75ee5728e4a458740e5ff98cc97 (patch) | |
| tree | 77bdf1f321ced6ee4698535c8b2a8de14d6e8238 /java/broker/src | |
| parent | bbc2f087fb974cd976c423bdbdd9ca6123d06ccb (diff) | |
| download | qpid-python-2221875e2742c75ee5728e4a458740e5ff98cc97.tar.gz | |
Short pause to help ensure connection.close comes after last message ack, added to PropertyValueTest
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@518667 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker/src')
3 files changed, 22 insertions, 11 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java b/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java index fb16267d97..ff933d3c0b 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java +++ b/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java @@ -36,12 +36,14 @@ public abstract class RequiredDeliveryException extends AMQException public RequiredDeliveryException(String message, AMQMessage payload) { super(message); - _amqMessage = payload; + // Increment the reference as this message is in the routing phase // and so will have the ref decremented as routing fails. // we need to keep this message around so we can return it in the - // handler. So increment here. - payload.incrementReference(); + // handler. So increment here. + _amqMessage = payload.takeReference(); + + //payload.incrementReference(); } public AMQMessage getAMQMessage() diff --git a/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java b/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java index aac9408247..5ca8d57f7c 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java +++ b/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java @@ -116,7 +116,7 @@ public class TxAck implements TxnOp for (UnacknowledgedMessage msg : _unacked) { msg.clearTransientMessageData(); - msg.message.incrementReference(); + msg.message.takeReference(); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java index cdf316f2d7..d6962d28cd 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java @@ -355,15 +355,22 @@ public class AMQMessage return _messageId; } + /** + * Creates a long-lived reference to this message, and increments the count of such references, as an atomic operation. + */ + public AMQMessage takeReference() + { + _referenceCount.incrementAndGet(); + return this; + } + /** Threadsafe. Increment the reference count on the message. */ - public void incrementReference() + protected void incrementReference() { _referenceCount.incrementAndGet(); if (_log.isDebugEnabled()) { - _log.debug("Ref count on message " + debugIdentity() + " incremented " + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 6)); - } } @@ -377,11 +384,13 @@ public class AMQMessage */ public void decrementReference(StoreContext storeContext) throws MessageCleanupException { + int count = _referenceCount.decrementAndGet(); + // note that the operation of decrementing the reference count and then removing the message does not // have to be atomic since the ref count starts at 1 and the exchange itself decrements that after // the message has been passed to all queues. i.e. we are // not relying on the all the increments having taken place before the delivery manager decrements. - if (_referenceCount.decrementAndGet() == 0) + if (count == 0) { try { @@ -408,13 +417,13 @@ public class AMQMessage { if (_log.isDebugEnabled()) { - _log.debug("Decremented ref count is now " + _referenceCount + " for message id " + debugIdentity() + "\n" + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 5)); - if (_referenceCount.get() < 0) + _log.debug("Decremented ref count is now " + count + " for message id " + debugIdentity() + "\n" + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 5)); + if (count < 0) { Thread.dumpStack(); } } - if (_referenceCount.get() < 0) + if (count < 0) { throw new MessageCleanupException("Reference count for message id " + debugIdentity() + " has gone below 0."); } |
