summaryrefslogtreecommitdiff
path: root/java/broker
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2007-02-15 23:23:48 +0000
committerRobert Godfrey <rgodfrey@apache.org>2007-02-15 23:23:48 +0000
commit7a48e7adf5d8db51c58878888f8a7ca62da16cf5 (patch)
tree34d8c5587f2a7f9a4d050ed1db28ddfe1fdb75cd /java/broker
parent9dbfac1edb6fd5b4d65c8e8f537eecb769c15f0a (diff)
downloadqpid-python-7a48e7adf5d8db51c58878888f8a7ca62da16cf5.tar.gz
QPID-366 : Reference counting not being decremented correctly and other persistence issues
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@508235 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java20
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java7
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java10
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java72
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessageHandle.java12
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java16
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java28
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java4
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java4
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java18
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java4
13 files changed, 136 insertions, 63 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index fa4219ecd1..8b36576a30 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -43,6 +43,7 @@ import org.apache.qpid.server.ack.UnacknowledgedMessageMapImpl;
import org.apache.qpid.server.exchange.MessageRouter;
import org.apache.qpid.server.exchange.NoRouteException;
import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.protocol.AMQMinaProtocolSession;
import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.MessageHandleFactory;
@@ -112,7 +113,7 @@ public class AMQChannel
* A context used by the message store enabling it to track context for a given channel even across
* thread boundaries
*/
- private final StoreContext _storeContext = new StoreContext();
+ private final StoreContext _storeContext;
private final List<RequiredDeliveryException> _returnMessages = new LinkedList<RequiredDeliveryException>();
@@ -120,12 +121,16 @@ public class AMQChannel
private Set<Long> _browsedAcks = new HashSet<Long>();
+ private final AMQProtocolSession _session;
- public AMQChannel(int channelId, MessageStore messageStore, MessageRouter exchanges)
+
+ public AMQChannel(AMQProtocolSession session, int channelId, MessageStore messageStore, MessageRouter exchanges)
throws AMQException
{
+ _session = session;
_channelId = channelId;
+ _storeContext = new StoreContext("Session: " + session.getClientIdentifier() + "; channel: " + channelId);
_prefetch_HighWaterMark = DEFAULT_PREFETCH;
_prefetch_LowWaterMark = _prefetch_HighWaterMark / 2;
_messageStore = messageStore;
@@ -338,7 +343,8 @@ public class AMQChannel
_txnContext.rollback();
unsubscribeAllConsumers(session);
requeue();
- _txnContext.commit();
+ _txnContext.commit();
+
}
private void unsubscribeAllConsumers(AMQProtocolSession session) throws AMQException
@@ -386,8 +392,10 @@ public class AMQChannel
_txnContext.deliver(unacked.message, unacked.queue);
}
}
+
}
+
/**
* Called to resend all outstanding unacknowledged messages to this same channel.
*/
@@ -403,7 +411,7 @@ public class AMQChannel
AMQShortString consumerTag = message.consumerTag;
AMQMessage msg = message.message;
msg.setRedelivered(true);
- if((consumerTag != null) && _consumerTag2QueueMap.containsKey(consumerTag))
+ if((consumerTag != null) && _consumerTag2QueueMap.containsKey(consumerTag) && !isSuspended())
{
msg.writeDeliver(session, _channelId, deliveryTag, consumerTag);
}
@@ -417,6 +425,7 @@ public class AMQChannel
msgToRequeue.add(message);
}
}
+
// false means continue processing
return false;
}
@@ -430,6 +439,7 @@ public class AMQChannel
{
_txnContext.deliver(message.message, message.queue);
_unacknowledgedMessageMap.remove(message.deliveryTag);
+ message.message.decrementReference(_storeContext);
}
}
@@ -559,6 +569,8 @@ public class AMQChannel
public void rollback() throws AMQException
{
_txnContext.rollback();
+
+
}
public String toString()
diff --git a/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java b/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java
index bbfab8132c..c987c12154 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java
@@ -100,6 +100,7 @@ public class TxAck implements TxnOp
//make persistent changes, i.e. dequeue and decrementReference
for (UnacknowledgedMessage msg : _unacked)
{
+ msg.restoreTransientMessageData();
msg.discard(storeContext);
}
}
@@ -112,6 +113,7 @@ public class TxAck implements TxnOp
//in memory (persistent changes will be rolled back by store)
for (UnacknowledgedMessage msg : _unacked)
{
+ msg.clearTransientMessageData();
msg.message.incrementReference();
}
}
@@ -120,6 +122,11 @@ public class TxAck implements TxnOp
{
//remove the unacked messages from the channels map
_map.remove(_unacked);
+ for (UnacknowledgedMessage msg : _unacked)
+ {
+ msg.clearTransientMessageData();
+ }
+
}
public void rollback(StoreContext storeContext)
diff --git a/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java b/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java
index ff3c901be5..3f2348b71b 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java
@@ -50,5 +50,15 @@ public class UnacknowledgedMessage
}
message.decrementReference(storeContext);
}
+
+ public void restoreTransientMessageData() throws AMQException
+ {
+ message.restoreTransientMessageData();
+ }
+
+ public void clearTransientMessageData()
+ {
+ message.clearTransientMessageData();
+ }
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java
index fb198ef4f7..03fc7a3926 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java
@@ -49,7 +49,7 @@ public class ChannelOpenHandler implements StateAwareMethodListener<ChannelOpenB
AMQProtocolSession session = stateManager.getProtocolSession();
VirtualHost virtualHost = session.getVirtualHost();
- final AMQChannel channel = new AMQChannel(evt.getChannelId(), virtualHost.getMessageStore(),
+ final AMQChannel channel = new AMQChannel(session,evt.getChannelId(), virtualHost.getMessageStore(),
virtualHost.getExchangeRegistry());
session.addChannel(channel);
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
index f23ec85391..be81734ae4 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
@@ -20,10 +20,7 @@
*/
package org.apache.qpid.server.queue;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Set;
+import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -111,7 +108,7 @@ public class AMQMessage
{
try
{
- return _index < _messageHandle.getBodyCount(_messageId) - 1;
+ return _index < _messageHandle.getBodyCount(getStoreContext(),_messageId) - 1;
}
catch (AMQException e)
{
@@ -124,7 +121,7 @@ public class AMQMessage
{
try
{
- ContentBody cb = _messageHandle.getContentBody(_messageId, ++_index);
+ ContentBody cb = _messageHandle.getContentBody(getStoreContext(),_messageId, ++_index);
return ContentBody.createAMQFrame(_channel, cb);
}
catch (AMQException e)
@@ -141,6 +138,11 @@ public class AMQMessage
}
}
+ private StoreContext getStoreContext()
+ {
+ return _txnContext.getStoreContext();
+ }
+
private class BodyContentIterator implements Iterator<ContentBody>
{
@@ -150,7 +152,7 @@ public class AMQMessage
{
try
{
- return _index < _messageHandle.getBodyCount(_messageId) - 1;
+ return _index < _messageHandle.getBodyCount(getStoreContext(),_messageId) - 1;
}
catch (AMQException e)
{
@@ -163,7 +165,7 @@ public class AMQMessage
{
try
{
- return _messageHandle.getContentBody(_messageId, ++_index);
+ return _messageHandle.getContentBody(getStoreContext(),_messageId, ++_index);
}
catch (AMQException e)
{
@@ -201,10 +203,11 @@ public class AMQMessage
* @param factory
* @throws AMQException
*/
- public AMQMessage(Long messageId, MessageStore store, MessageHandleFactory factory) throws AMQException
+ public AMQMessage(Long messageId, MessageStore store, MessageHandleFactory factory, TransactionalContext txnConext) throws AMQException
{
_messageId = messageId;
_messageHandle = factory.createMessageHandle(messageId, store, true);
+ _txnContext = txnConext;
_transientMessageData = null;
}
@@ -276,7 +279,7 @@ public class AMQMessage
}
else
{
- return _messageHandle.getContentHeaderBody(_messageId);
+ return _messageHandle.getContentHeaderBody(getStoreContext(),_messageId);
}
}
@@ -342,14 +345,16 @@ public class AMQMessage
_referenceCount.incrementAndGet();
if (_log.isDebugEnabled())
{
- _log.debug("Ref count on message " + _messageId + " incremented to " + _referenceCount);
+
+ _log.debug("Ref count on message " + _messageId + " incremented to " + _referenceCount + " " + Arrays.asList(Thread.currentThread().getStackTrace()).subList(0,4));
+
}
}
/**
* Threadsafe. This will decrement the reference count and when it reaches zero will remove the message from the
* message store.
- *
+ *
* @throws MessageCleanupException when an attempt was made to remove the message from the message store and that
* failed
*/
@@ -365,7 +370,9 @@ public class AMQMessage
{
if (_log.isDebugEnabled())
{
- _log.debug("Ref count on message " + _messageId + " is zero; removing message");
+ _log.debug("Ref count on message " + _messageId + " is zero; removing message" + Arrays.asList(Thread.currentThread().getStackTrace()).subList(0,4));
+
+
}
// must check if the handle is null since there may be cases where we decide to throw away a message
@@ -386,7 +393,7 @@ public class AMQMessage
{
if (_log.isDebugEnabled())
{
- _log.debug("Ref count is now " + _referenceCount + " for message id " + _messageId);
+ _log.debug("Ref count is now " + _referenceCount + " for message id " + _messageId+ "\n" + Arrays.asList(Thread.currentThread().getStackTrace()).subList(0,4));
if (_referenceCount.get() < 0)
{
Thread.dumpStack();
@@ -475,7 +482,7 @@ public class AMQMessage
}
else
{
- return _messageHandle.isPersistent(_messageId);
+ return _messageHandle.isPersistent(getStoreContext(),_messageId);
}
}
@@ -504,7 +511,7 @@ public class AMQMessage
}
else
{
- pb = _messageHandle.getPublishBody(_messageId);
+ pb = _messageHandle.getPublishBody(getStoreContext(),_messageId);
}
return pb;
}
@@ -541,7 +548,7 @@ public class AMQMessage
List<AMQQueue> destinationQueues = _transientMessageData.getDestinationQueues();
if (_log.isDebugEnabled())
{
- _log.debug("Delivering message " + _messageId);
+ _log.debug("Delivering message " + _messageId + " to " + destinationQueues);
}
try
{
@@ -575,7 +582,7 @@ public class AMQMessage
AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
getContentHeaderBody());
- final int bodyCount = _messageHandle.getBodyCount(_messageId);
+ final int bodyCount = _messageHandle.getBodyCount(getStoreContext(),_messageId);
if(bodyCount == 0)
{
SmallCompositeAMQDataBlock compositeBlock = new SmallCompositeAMQDataBlock(deliver,
@@ -591,7 +598,7 @@ public class AMQMessage
// Optimise the case where we have a single content body. In that case we create a composite block
// so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
//
- ContentBody cb = _messageHandle.getContentBody(_messageId, 0);
+ ContentBody cb = _messageHandle.getContentBody(getStoreContext(),_messageId, 0);
AMQDataBlock firstContentBody = ContentBody.createAMQFrame(channelId, cb);
AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody};
@@ -603,7 +610,7 @@ public class AMQMessage
//
for(int i = 1; i < bodyCount; i++)
{
- cb = _messageHandle.getContentBody(_messageId, i);
+ cb = _messageHandle.getContentBody(getStoreContext(),_messageId, i);
protocolSession.writeFrame(ContentBody.createAMQFrame(channelId, cb));
}
@@ -619,7 +626,7 @@ public class AMQMessage
AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
getContentHeaderBody());
- final int bodyCount = _messageHandle.getBodyCount(_messageId);
+ final int bodyCount = _messageHandle.getBodyCount(getStoreContext(),_messageId);
if(bodyCount == 0)
{
SmallCompositeAMQDataBlock compositeBlock = new SmallCompositeAMQDataBlock(deliver,
@@ -634,7 +641,7 @@ public class AMQMessage
// Optimise the case where we have a single content body. In that case we create a composite block
// so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
//
- ContentBody cb = _messageHandle.getContentBody(_messageId, 0);
+ ContentBody cb = _messageHandle.getContentBody(getStoreContext(),_messageId, 0);
AMQDataBlock firstContentBody = ContentBody.createAMQFrame(channelId, cb);
AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody};
@@ -646,7 +653,7 @@ public class AMQMessage
//
for(int i = 1; i < bodyCount; i++)
{
- cb = _messageHandle.getContentBody(_messageId, i);
+ cb = _messageHandle.getContentBody(getStoreContext(),_messageId, i);
protocolSession.writeFrame(ContentBody.createAMQFrame(channelId, cb));
}
@@ -749,13 +756,30 @@ public class AMQMessage
}
catch (AMQException e)
{
- _log.error(e);
+ _log.error(e.toString(),e);
return 0;
}
}
+
+ public void restoreTransientMessageData() throws AMQException
+ {
+ TransientMessageData transientMessageData = new TransientMessageData();
+ transientMessageData.setPublishBody(getPublishBody());
+ transientMessageData.setContentHeaderBody(getContentHeaderBody());
+ transientMessageData.addBodyLength(getContentHeaderBody().getSize());
+ _transientMessageData = transientMessageData;
+ }
+
+
+ public void clearTransientMessageData()
+ {
+ _transientMessageData = null;
+ }
+
+
public String toString()
{
return "Message: " + _messageId + "; ref count: " + _referenceCount + "; taken: " +
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessageHandle.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessageHandle.java
index 6aa8f98403..210c9f01a8 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessageHandle.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessageHandle.java
@@ -35,17 +35,17 @@ import org.apache.qpid.server.store.StoreContext;
*/
public interface AMQMessageHandle
{
- ContentHeaderBody getContentHeaderBody(Long messageId) throws AMQException;
+ ContentHeaderBody getContentHeaderBody(StoreContext context, Long messageId) throws AMQException;
/**
* @return the number of body frames associated with this message
*/
- int getBodyCount(Long messageId) throws AMQException;
+ int getBodyCount(StoreContext context, Long messageId) throws AMQException;
/**
* @return the size of the body
*/
- long getBodySize(Long messageId) throws AMQException;
+ long getBodySize(StoreContext context, Long messageId) throws AMQException;
/**
* Get a particular content body
@@ -53,17 +53,17 @@ public interface AMQMessageHandle
* @return a content body
* @throws IllegalArgumentException if the index is invalid
*/
- ContentBody getContentBody(Long messageId, int index) throws IllegalArgumentException, AMQException;
+ ContentBody getContentBody(StoreContext context, Long messageId, int index) throws IllegalArgumentException, AMQException;
void addContentBodyFrame(StoreContext storeContext, Long messageId, ContentBody contentBody, boolean isLastContentBody) throws AMQException;
- BasicPublishBody getPublishBody(Long messageId) throws AMQException;
+ BasicPublishBody getPublishBody(StoreContext context, Long messageId) throws AMQException;
boolean isRedelivered();
void setRedelivered(boolean redelivered);
- boolean isPersistent(Long messageId) throws AMQException;
+ boolean isPersistent(StoreContext context, Long messageId) throws AMQException;
void setPublishAndContentHeaderBody(StoreContext storeContext, Long messageId, BasicPublishBody publishBody,
ContentHeaderBody contentHeaderBody)
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java b/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java
index 5890d7b72c..79f875ce1e 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java
@@ -49,22 +49,22 @@ public class InMemoryMessageHandle implements AMQMessageHandle
{
}
- public ContentHeaderBody getContentHeaderBody(Long messageId) throws AMQException
+ public ContentHeaderBody getContentHeaderBody(StoreContext context, Long messageId) throws AMQException
{
return _contentHeaderBody;
}
- public int getBodyCount(Long messageId)
+ public int getBodyCount(StoreContext context, Long messageId)
{
return _contentBodies.size();
}
- public long getBodySize(Long messageId) throws AMQException
+ public long getBodySize(StoreContext context, Long messageId) throws AMQException
{
- return getContentHeaderBody(messageId).bodySize;
+ return getContentHeaderBody(context, messageId).bodySize;
}
- public ContentBody getContentBody(Long messageId, int index) throws AMQException, IllegalArgumentException
+ public ContentBody getContentBody(StoreContext context, Long messageId, int index) throws AMQException, IllegalArgumentException
{
if (index > _contentBodies.size() - 1)
{
@@ -80,7 +80,7 @@ public class InMemoryMessageHandle implements AMQMessageHandle
_contentBodies.add(contentBody);
}
- public BasicPublishBody getPublishBody(Long messageId) throws AMQException
+ public BasicPublishBody getPublishBody(StoreContext context, Long messageId) throws AMQException
{
return _publishBody;
}
@@ -96,10 +96,10 @@ public class InMemoryMessageHandle implements AMQMessageHandle
_redelivered = redelivered;
}
- public boolean isPersistent(Long messageId) throws AMQException
+ public boolean isPersistent(StoreContext context, Long messageId) throws AMQException
{
//todo remove literal values to a constant file such as AMQConstants in common
- ContentHeaderBody chb = getContentHeaderBody(messageId);
+ ContentHeaderBody chb = getContentHeaderBody(context, messageId);
return chb.properties instanceof BasicContentHeaderProperties &&
((BasicContentHeaderProperties) chb.properties).getDeliveryMode() == 2;
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
index 8e270f9772..05841ccfc0 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
@@ -267,9 +267,11 @@ public class SubscriptionImpl implements Subscription
if (_acks)
{
channel.addUnacknowledgedMessage(msg, deliveryTag, consumerTag, queue);
+ msg.decrementReference(storeContext);
}
msg.writeDeliver(protocolSession, channel.getChannelId(), deliveryTag, consumerTag);
+
}
}
finally
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java b/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java
index 161913ef15..670d895950 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java
@@ -56,21 +56,21 @@ public class WeakReferenceMessageHandle implements AMQMessageHandle
_messageStore = messageStore;
}
- public ContentHeaderBody getContentHeaderBody(Long messageId) throws AMQException
+ public ContentHeaderBody getContentHeaderBody(StoreContext context, Long messageId) throws AMQException
{
ContentHeaderBody chb = (_contentHeaderBody != null ? _contentHeaderBody.get() : null);
if (chb == null)
{
- MessageMetaData mmd = loadMessageMetaData(messageId);
+ MessageMetaData mmd = loadMessageMetaData(context, messageId);
chb = mmd.getContentHeaderBody();
}
return chb;
}
- private MessageMetaData loadMessageMetaData(Long messageId)
+ private MessageMetaData loadMessageMetaData(StoreContext context, Long messageId)
throws AMQException
{
- MessageMetaData mmd = _messageStore.getMessageMetaData(messageId);
+ MessageMetaData mmd = _messageStore.getMessageMetaData(context, messageId);
populateFromMessageMetaData(mmd);
return mmd;
}
@@ -82,11 +82,11 @@ public class WeakReferenceMessageHandle implements AMQMessageHandle
_publishBody = new WeakReference<BasicPublishBody>(mmd.getPublishBody());
}
- public int getBodyCount(Long messageId) throws AMQException
+ public int getBodyCount(StoreContext context, Long messageId) throws AMQException
{
if (_contentBodies == null)
{
- MessageMetaData mmd = _messageStore.getMessageMetaData(messageId);
+ MessageMetaData mmd = _messageStore.getMessageMetaData(context, messageId);
int chunkCount = mmd.getContentChunkCount();
_contentBodies = new ArrayList<WeakReference<ContentBody>>(chunkCount);
for (int i = 0; i < chunkCount; i++)
@@ -97,12 +97,12 @@ public class WeakReferenceMessageHandle implements AMQMessageHandle
return _contentBodies.size();
}
- public long getBodySize(Long messageId) throws AMQException
+ public long getBodySize(StoreContext context, Long messageId) throws AMQException
{
- return getContentHeaderBody(messageId).bodySize;
+ return getContentHeaderBody(context, messageId).bodySize;
}
- public ContentBody getContentBody(Long messageId, int index) throws AMQException, IllegalArgumentException
+ public ContentBody getContentBody(StoreContext context, Long messageId, int index) throws AMQException, IllegalArgumentException
{
if (index > _contentBodies.size() - 1)
{
@@ -113,7 +113,7 @@ public class WeakReferenceMessageHandle implements AMQMessageHandle
ContentBody cb = wr.get();
if (cb == null)
{
- cb = _messageStore.getContentBodyChunk(messageId, index);
+ cb = _messageStore.getContentBodyChunk(context, messageId, index);
_contentBodies.set(index, new WeakReference<ContentBody>(cb));
}
return cb;
@@ -145,12 +145,12 @@ public class WeakReferenceMessageHandle implements AMQMessageHandle
_messageStore.storeContentBodyChunk(storeContext, messageId, _contentBodies.size() - 1, contentBody, isLastContentBody);
}
- public BasicPublishBody getPublishBody(Long messageId) throws AMQException
+ public BasicPublishBody getPublishBody(StoreContext context, Long messageId) throws AMQException
{
BasicPublishBody bpb = (_publishBody != null ? _publishBody.get() : null);
if (bpb == null)
{
- MessageMetaData mmd = loadMessageMetaData(messageId);
+ MessageMetaData mmd = loadMessageMetaData(context, messageId);
bpb = mmd.getPublishBody();
}
@@ -167,10 +167,10 @@ public class WeakReferenceMessageHandle implements AMQMessageHandle
_redelivered = redelivered;
}
- public boolean isPersistent(Long messageId) throws AMQException
+ public boolean isPersistent(StoreContext context, Long messageId) throws AMQException
{
//todo remove literal values to a constant file such as AMQConstants in common
- ContentHeaderBody chb = getContentHeaderBody(messageId);
+ ContentHeaderBody chb = getContentHeaderBody(context, messageId);
return chb.properties instanceof BasicContentHeaderProperties &&
((BasicContentHeaderProperties) chb.properties).getDeliveryMode() == 2;
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
index 6c4ad10429..f678cea630 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
@@ -174,12 +174,12 @@ public class MemoryMessageStore implements MessageStore
_metaDataMap.put(messageId, messageMetaData);
}
- public MessageMetaData getMessageMetaData(Long messageId) throws AMQException
+ public MessageMetaData getMessageMetaData(StoreContext context,Long messageId) throws AMQException
{
return _metaDataMap.get(messageId);
}
- public ContentBody getContentBodyChunk(Long messageId, int index) throws AMQException
+ public ContentBody getContentBodyChunk(StoreContext context, Long messageId, int index) throws AMQException
{
List<ContentBody> bodyList = _contentBodyMap.get(messageId);
return bodyList.get(index);
diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java
index d707ece8da..7fa46eb1ca 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java
@@ -84,8 +84,8 @@ public interface MessageStore
void storeMessageMetaData(StoreContext context, Long messageId, MessageMetaData messageMetaData) throws AMQException;
- MessageMetaData getMessageMetaData(Long messageId) throws AMQException;
+ MessageMetaData getMessageMetaData(StoreContext context, Long messageId) throws AMQException;
- ContentBody getContentBodyChunk(Long messageId, int index) throws AMQException;
+ ContentBody getContentBodyChunk(StoreContext context, Long messageId, int index) throws AMQException;
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java b/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java
index 55e5067852..2e2f2ba7d6 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java
@@ -20,6 +20,9 @@
*/
package org.apache.qpid.server.store;
+import org.apache.log4j.Logger;
+
+
/**
* A context that the store can use to associate with a transactional context. For example, it could store
* some kind of txn id.
@@ -28,8 +31,22 @@ package org.apache.qpid.server.store;
*/
public class StoreContext
{
+
+ private static final Logger _logger = Logger.getLogger(StoreContext.class);
+
+ private String _name;
private Object _payload;
+ public StoreContext()
+ {
+ _name = super.toString();
+ }
+
+ public StoreContext(String name)
+ {
+ _name = name;
+ }
+
public Object getPayload()
{
return _payload;
@@ -37,6 +54,7 @@ public class StoreContext
public void setPayload(Object payload)
{
+ _logger.debug("["+_name+"] Setting payload: " + payload);
_payload = payload;
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java b/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java
index 7481a96ae4..5c915b5c84 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java
@@ -168,7 +168,7 @@ public class LocalTransactionalContext implements TransactionalContext
{
if (_log.isDebugEnabled())
{
- _log.debug("Starting transaction on message store");
+ _log.debug("Starting transaction on message store: " + this);
}
_messageStore.beginTran(_storeContext);
_inTran = true;
@@ -179,7 +179,7 @@ public class LocalTransactionalContext implements TransactionalContext
{
if (_log.isDebugEnabled())
{
- _log.debug("Committing transactional context");
+ _log.debug("Committing transactional context: " + this);
}
if (_ackOp != null)
{