summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Greig <rgreig@apache.org>2007-03-15 15:39:39 +0000
committerRobert Greig <rgreig@apache.org>2007-03-15 15:39:39 +0000
commit2221875e2742c75ee5728e4a458740e5ff98cc97 (patch)
tree77bdf1f321ced6ee4698535c8b2a8de14d6e8238
parentbbc2f087fb974cd976c423bdbdd9ca6123d06ccb (diff)
downloadqpid-python-2221875e2742c75ee5728e4a458740e5ff98cc97.tar.gz
Short pause to help ensure connection.close comes after last message ack, added to PropertyValueTest
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@518667 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java8
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java23
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java2
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/store/TestReferenceCounting.java8
5 files changed, 29 insertions, 14 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java b/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java
index fb16267d97..ff933d3c0b 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java
@@ -36,12 +36,14 @@ public abstract class RequiredDeliveryException extends AMQException
public RequiredDeliveryException(String message, AMQMessage payload)
{
super(message);
- _amqMessage = payload;
+
// Increment the reference as this message is in the routing phase
// and so will have the ref decremented as routing fails.
// we need to keep this message around so we can return it in the
- // handler. So increment here.
- payload.incrementReference();
+ // handler. So increment here.
+ _amqMessage = payload.takeReference();
+
+ //payload.incrementReference();
}
public AMQMessage getAMQMessage()
diff --git a/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java b/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java
index aac9408247..5ca8d57f7c 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java
@@ -116,7 +116,7 @@ public class TxAck implements TxnOp
for (UnacknowledgedMessage msg : _unacked)
{
msg.clearTransientMessageData();
- msg.message.incrementReference();
+ msg.message.takeReference();
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
index cdf316f2d7..d6962d28cd 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
@@ -355,15 +355,22 @@ public class AMQMessage
return _messageId;
}
+ /**
+ * Creates a long-lived reference to this message, and increments the count of such references, as an atomic operation.
+ */
+ public AMQMessage takeReference()
+ {
+ _referenceCount.incrementAndGet();
+ return this;
+ }
+
/** Threadsafe. Increment the reference count on the message. */
- public void incrementReference()
+ protected void incrementReference()
{
_referenceCount.incrementAndGet();
if (_log.isDebugEnabled())
{
-
_log.debug("Ref count on message " + debugIdentity() + " incremented " + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 6));
-
}
}
@@ -377,11 +384,13 @@ public class AMQMessage
*/
public void decrementReference(StoreContext storeContext) throws MessageCleanupException
{
+ int count = _referenceCount.decrementAndGet();
+
// note that the operation of decrementing the reference count and then removing the message does not
// have to be atomic since the ref count starts at 1 and the exchange itself decrements that after
// the message has been passed to all queues. i.e. we are
// not relying on the all the increments having taken place before the delivery manager decrements.
- if (_referenceCount.decrementAndGet() == 0)
+ if (count == 0)
{
try
{
@@ -408,13 +417,13 @@ public class AMQMessage
{
if (_log.isDebugEnabled())
{
- _log.debug("Decremented ref count is now " + _referenceCount + " for message id " + debugIdentity() + "\n" + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 5));
- if (_referenceCount.get() < 0)
+ _log.debug("Decremented ref count is now " + count + " for message id " + debugIdentity() + "\n" + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 5));
+ if (count < 0)
{
Thread.dumpStack();
}
}
- if (_referenceCount.get() < 0)
+ if (count < 0)
{
throw new MessageCleanupException("Reference count for message id " + debugIdentity() + " has gone below 0.");
}
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java
index 1e50a62fee..fd997e3abd 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java
@@ -117,6 +117,8 @@ public class PropertyValueTest extends TestCase implements MessageListener
waitFor(count);
check();
_logger.info("Completed without failure");
+
+ Thread.sleep(10);
_connection.close();
_logger.error("End Run Number:" + (run - 1));
diff --git a/java/systests/src/main/java/org/apache/qpid/server/store/TestReferenceCounting.java b/java/systests/src/main/java/org/apache/qpid/server/store/TestReferenceCounting.java
index 2f0eaac29a..ab6d9742e4 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/store/TestReferenceCounting.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/store/TestReferenceCounting.java
@@ -80,7 +80,8 @@ public class TestReferenceCounting extends TestCase
AMQMessage message = new AMQMessage(_store.getNewMessageId(), info,
new NonTransactionalContext(_store, _storeContext, null, null, null),
createPersistentContentHeader());
- message.incrementReference();
+ message = message.takeReference();
+
// we call routing complete to set up the handle
message.routingComplete(_store, _storeContext, new MessageHandleFactory());
assertTrue(_store.getMessageMetaDataMap().size() == 1);
@@ -128,11 +129,12 @@ public class TestReferenceCounting extends TestCase
info,
new NonTransactionalContext(_store, _storeContext, null, null, null),
createPersistentContentHeader());
- message.incrementReference();
+
+ message = message.takeReference();
// we call routing complete to set up the handle
message.routingComplete(_store, _storeContext, new MessageHandleFactory());
assertTrue(_store.getMessageMetaDataMap().size() == 1);
- message.incrementReference();
+ message = message.takeReference();
message.decrementReference(_storeContext);
assertTrue(_store.getMessageMetaDataMap().size() == 1);
}