diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2007-02-15 23:23:48 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2007-02-15 23:23:48 +0000 |
| commit | 7a48e7adf5d8db51c58878888f8a7ca62da16cf5 (patch) | |
| tree | 34d8c5587f2a7f9a4d050ed1db28ddfe1fdb75cd /java/broker | |
| parent | 9dbfac1edb6fd5b4d65c8e8f537eecb769c15f0a (diff) | |
| download | qpid-python-7a48e7adf5d8db51c58878888f8a7ca62da16cf5.tar.gz | |
QPID-366 : Reference counting not being decremented correctly and other persistence issues
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@508235 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker')
13 files changed, 136 insertions, 63 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index fa4219ecd1..8b36576a30 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -43,6 +43,7 @@ import org.apache.qpid.server.ack.UnacknowledgedMessageMapImpl; import org.apache.qpid.server.exchange.MessageRouter; import org.apache.qpid.server.exchange.NoRouteException; import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.protocol.AMQMinaProtocolSession; import org.apache.qpid.server.queue.AMQMessage; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.MessageHandleFactory; @@ -112,7 +113,7 @@ public class AMQChannel * A context used by the message store enabling it to track context for a given channel even across * thread boundaries */ - private final StoreContext _storeContext = new StoreContext(); + private final StoreContext _storeContext; private final List<RequiredDeliveryException> _returnMessages = new LinkedList<RequiredDeliveryException>(); @@ -120,12 +121,16 @@ public class AMQChannel private Set<Long> _browsedAcks = new HashSet<Long>(); + private final AMQProtocolSession _session; - public AMQChannel(int channelId, MessageStore messageStore, MessageRouter exchanges) + + public AMQChannel(AMQProtocolSession session, int channelId, MessageStore messageStore, MessageRouter exchanges) throws AMQException { + _session = session; _channelId = channelId; + _storeContext = new StoreContext("Session: " + session.getClientIdentifier() + "; channel: " + channelId); _prefetch_HighWaterMark = DEFAULT_PREFETCH; _prefetch_LowWaterMark = _prefetch_HighWaterMark / 2; _messageStore = messageStore; @@ -338,7 +343,8 @@ public class AMQChannel _txnContext.rollback(); unsubscribeAllConsumers(session); requeue(); - _txnContext.commit(); + _txnContext.commit(); + } private void unsubscribeAllConsumers(AMQProtocolSession session) throws AMQException @@ -386,8 +392,10 @@ public class AMQChannel _txnContext.deliver(unacked.message, unacked.queue); } } + } + /** * Called to resend all outstanding unacknowledged messages to this same channel. */ @@ -403,7 +411,7 @@ public class AMQChannel AMQShortString consumerTag = message.consumerTag; AMQMessage msg = message.message; msg.setRedelivered(true); - if((consumerTag != null) && _consumerTag2QueueMap.containsKey(consumerTag)) + if((consumerTag != null) && _consumerTag2QueueMap.containsKey(consumerTag) && !isSuspended()) { msg.writeDeliver(session, _channelId, deliveryTag, consumerTag); } @@ -417,6 +425,7 @@ public class AMQChannel msgToRequeue.add(message); } } + // false means continue processing return false; } @@ -430,6 +439,7 @@ public class AMQChannel { _txnContext.deliver(message.message, message.queue); _unacknowledgedMessageMap.remove(message.deliveryTag); + message.message.decrementReference(_storeContext); } } @@ -559,6 +569,8 @@ public class AMQChannel public void rollback() throws AMQException { _txnContext.rollback(); + + } public String toString() diff --git a/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java b/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java index bbfab8132c..c987c12154 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java +++ b/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java @@ -100,6 +100,7 @@ public class TxAck implements TxnOp //make persistent changes, i.e. dequeue and decrementReference for (UnacknowledgedMessage msg : _unacked) { + msg.restoreTransientMessageData(); msg.discard(storeContext); } } @@ -112,6 +113,7 @@ public class TxAck implements TxnOp //in memory (persistent changes will be rolled back by store) for (UnacknowledgedMessage msg : _unacked) { + msg.clearTransientMessageData(); msg.message.incrementReference(); } } @@ -120,6 +122,11 @@ public class TxAck implements TxnOp { //remove the unacked messages from the channels map _map.remove(_unacked); + for (UnacknowledgedMessage msg : _unacked) + { + msg.clearTransientMessageData(); + } + } public void rollback(StoreContext storeContext) diff --git a/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java b/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java index ff3c901be5..3f2348b71b 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java +++ b/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java @@ -50,5 +50,15 @@ public class UnacknowledgedMessage } message.decrementReference(storeContext); } + + public void restoreTransientMessageData() throws AMQException + { + message.restoreTransientMessageData(); + } + + public void clearTransientMessageData() + { + message.clearTransientMessageData(); + } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java index fb198ef4f7..03fc7a3926 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java @@ -49,7 +49,7 @@ public class ChannelOpenHandler implements StateAwareMethodListener<ChannelOpenB AMQProtocolSession session = stateManager.getProtocolSession(); VirtualHost virtualHost = session.getVirtualHost(); - final AMQChannel channel = new AMQChannel(evt.getChannelId(), virtualHost.getMessageStore(), + final AMQChannel channel = new AMQChannel(session,evt.getChannelId(), virtualHost.getMessageStore(), virtualHost.getExchangeRegistry()); session.addChannel(channel); // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java index f23ec85391..be81734ae4 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java @@ -20,10 +20,7 @@ */ package org.apache.qpid.server.queue; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Set; +import java.util.*; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -111,7 +108,7 @@ public class AMQMessage { try { - return _index < _messageHandle.getBodyCount(_messageId) - 1; + return _index < _messageHandle.getBodyCount(getStoreContext(),_messageId) - 1; } catch (AMQException e) { @@ -124,7 +121,7 @@ public class AMQMessage { try { - ContentBody cb = _messageHandle.getContentBody(_messageId, ++_index); + ContentBody cb = _messageHandle.getContentBody(getStoreContext(),_messageId, ++_index); return ContentBody.createAMQFrame(_channel, cb); } catch (AMQException e) @@ -141,6 +138,11 @@ public class AMQMessage } } + private StoreContext getStoreContext() + { + return _txnContext.getStoreContext(); + } + private class BodyContentIterator implements Iterator<ContentBody> { @@ -150,7 +152,7 @@ public class AMQMessage { try { - return _index < _messageHandle.getBodyCount(_messageId) - 1; + return _index < _messageHandle.getBodyCount(getStoreContext(),_messageId) - 1; } catch (AMQException e) { @@ -163,7 +165,7 @@ public class AMQMessage { try { - return _messageHandle.getContentBody(_messageId, ++_index); + return _messageHandle.getContentBody(getStoreContext(),_messageId, ++_index); } catch (AMQException e) { @@ -201,10 +203,11 @@ public class AMQMessage * @param factory * @throws AMQException */ - public AMQMessage(Long messageId, MessageStore store, MessageHandleFactory factory) throws AMQException + public AMQMessage(Long messageId, MessageStore store, MessageHandleFactory factory, TransactionalContext txnConext) throws AMQException { _messageId = messageId; _messageHandle = factory.createMessageHandle(messageId, store, true); + _txnContext = txnConext; _transientMessageData = null; } @@ -276,7 +279,7 @@ public class AMQMessage } else { - return _messageHandle.getContentHeaderBody(_messageId); + return _messageHandle.getContentHeaderBody(getStoreContext(),_messageId); } } @@ -342,14 +345,16 @@ public class AMQMessage _referenceCount.incrementAndGet(); if (_log.isDebugEnabled()) { - _log.debug("Ref count on message " + _messageId + " incremented to " + _referenceCount); + + _log.debug("Ref count on message " + _messageId + " incremented to " + _referenceCount + " " + Arrays.asList(Thread.currentThread().getStackTrace()).subList(0,4)); + } } /** * Threadsafe. This will decrement the reference count and when it reaches zero will remove the message from the * message store. - * + * * @throws MessageCleanupException when an attempt was made to remove the message from the message store and that * failed */ @@ -365,7 +370,9 @@ public class AMQMessage { if (_log.isDebugEnabled()) { - _log.debug("Ref count on message " + _messageId + " is zero; removing message"); + _log.debug("Ref count on message " + _messageId + " is zero; removing message" + Arrays.asList(Thread.currentThread().getStackTrace()).subList(0,4)); + + } // must check if the handle is null since there may be cases where we decide to throw away a message @@ -386,7 +393,7 @@ public class AMQMessage { if (_log.isDebugEnabled()) { - _log.debug("Ref count is now " + _referenceCount + " for message id " + _messageId); + _log.debug("Ref count is now " + _referenceCount + " for message id " + _messageId+ "\n" + Arrays.asList(Thread.currentThread().getStackTrace()).subList(0,4)); if (_referenceCount.get() < 0) { Thread.dumpStack(); @@ -475,7 +482,7 @@ public class AMQMessage } else { - return _messageHandle.isPersistent(_messageId); + return _messageHandle.isPersistent(getStoreContext(),_messageId); } } @@ -504,7 +511,7 @@ public class AMQMessage } else { - pb = _messageHandle.getPublishBody(_messageId); + pb = _messageHandle.getPublishBody(getStoreContext(),_messageId); } return pb; } @@ -541,7 +548,7 @@ public class AMQMessage List<AMQQueue> destinationQueues = _transientMessageData.getDestinationQueues(); if (_log.isDebugEnabled()) { - _log.debug("Delivering message " + _messageId); + _log.debug("Delivering message " + _messageId + " to " + destinationQueues); } try { @@ -575,7 +582,7 @@ public class AMQMessage AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId, getContentHeaderBody()); - final int bodyCount = _messageHandle.getBodyCount(_messageId); + final int bodyCount = _messageHandle.getBodyCount(getStoreContext(),_messageId); if(bodyCount == 0) { SmallCompositeAMQDataBlock compositeBlock = new SmallCompositeAMQDataBlock(deliver, @@ -591,7 +598,7 @@ public class AMQMessage // Optimise the case where we have a single content body. In that case we create a composite block // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver. // - ContentBody cb = _messageHandle.getContentBody(_messageId, 0); + ContentBody cb = _messageHandle.getContentBody(getStoreContext(),_messageId, 0); AMQDataBlock firstContentBody = ContentBody.createAMQFrame(channelId, cb); AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody}; @@ -603,7 +610,7 @@ public class AMQMessage // for(int i = 1; i < bodyCount; i++) { - cb = _messageHandle.getContentBody(_messageId, i); + cb = _messageHandle.getContentBody(getStoreContext(),_messageId, i); protocolSession.writeFrame(ContentBody.createAMQFrame(channelId, cb)); } @@ -619,7 +626,7 @@ public class AMQMessage AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId, getContentHeaderBody()); - final int bodyCount = _messageHandle.getBodyCount(_messageId); + final int bodyCount = _messageHandle.getBodyCount(getStoreContext(),_messageId); if(bodyCount == 0) { SmallCompositeAMQDataBlock compositeBlock = new SmallCompositeAMQDataBlock(deliver, @@ -634,7 +641,7 @@ public class AMQMessage // Optimise the case where we have a single content body. In that case we create a composite block // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver. // - ContentBody cb = _messageHandle.getContentBody(_messageId, 0); + ContentBody cb = _messageHandle.getContentBody(getStoreContext(),_messageId, 0); AMQDataBlock firstContentBody = ContentBody.createAMQFrame(channelId, cb); AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody}; @@ -646,7 +653,7 @@ public class AMQMessage // for(int i = 1; i < bodyCount; i++) { - cb = _messageHandle.getContentBody(_messageId, i); + cb = _messageHandle.getContentBody(getStoreContext(),_messageId, i); protocolSession.writeFrame(ContentBody.createAMQFrame(channelId, cb)); } @@ -749,13 +756,30 @@ public class AMQMessage } catch (AMQException e) { - _log.error(e); + _log.error(e.toString(),e); return 0; } } + + public void restoreTransientMessageData() throws AMQException + { + TransientMessageData transientMessageData = new TransientMessageData(); + transientMessageData.setPublishBody(getPublishBody()); + transientMessageData.setContentHeaderBody(getContentHeaderBody()); + transientMessageData.addBodyLength(getContentHeaderBody().getSize()); + _transientMessageData = transientMessageData; + } + + + public void clearTransientMessageData() + { + _transientMessageData = null; + } + + public String toString() { return "Message: " + _messageId + "; ref count: " + _referenceCount + "; taken: " + diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessageHandle.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessageHandle.java index 6aa8f98403..210c9f01a8 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessageHandle.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessageHandle.java @@ -35,17 +35,17 @@ import org.apache.qpid.server.store.StoreContext; */ public interface AMQMessageHandle { - ContentHeaderBody getContentHeaderBody(Long messageId) throws AMQException; + ContentHeaderBody getContentHeaderBody(StoreContext context, Long messageId) throws AMQException; /** * @return the number of body frames associated with this message */ - int getBodyCount(Long messageId) throws AMQException; + int getBodyCount(StoreContext context, Long messageId) throws AMQException; /** * @return the size of the body */ - long getBodySize(Long messageId) throws AMQException; + long getBodySize(StoreContext context, Long messageId) throws AMQException; /** * Get a particular content body @@ -53,17 +53,17 @@ public interface AMQMessageHandle * @return a content body * @throws IllegalArgumentException if the index is invalid */ - ContentBody getContentBody(Long messageId, int index) throws IllegalArgumentException, AMQException; + ContentBody getContentBody(StoreContext context, Long messageId, int index) throws IllegalArgumentException, AMQException; void addContentBodyFrame(StoreContext storeContext, Long messageId, ContentBody contentBody, boolean isLastContentBody) throws AMQException; - BasicPublishBody getPublishBody(Long messageId) throws AMQException; + BasicPublishBody getPublishBody(StoreContext context, Long messageId) throws AMQException; boolean isRedelivered(); void setRedelivered(boolean redelivered); - boolean isPersistent(Long messageId) throws AMQException; + boolean isPersistent(StoreContext context, Long messageId) throws AMQException; void setPublishAndContentHeaderBody(StoreContext storeContext, Long messageId, BasicPublishBody publishBody, ContentHeaderBody contentHeaderBody) diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java b/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java index 5890d7b72c..79f875ce1e 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java @@ -49,22 +49,22 @@ public class InMemoryMessageHandle implements AMQMessageHandle { } - public ContentHeaderBody getContentHeaderBody(Long messageId) throws AMQException + public ContentHeaderBody getContentHeaderBody(StoreContext context, Long messageId) throws AMQException { return _contentHeaderBody; } - public int getBodyCount(Long messageId) + public int getBodyCount(StoreContext context, Long messageId) { return _contentBodies.size(); } - public long getBodySize(Long messageId) throws AMQException + public long getBodySize(StoreContext context, Long messageId) throws AMQException { - return getContentHeaderBody(messageId).bodySize; + return getContentHeaderBody(context, messageId).bodySize; } - public ContentBody getContentBody(Long messageId, int index) throws AMQException, IllegalArgumentException + public ContentBody getContentBody(StoreContext context, Long messageId, int index) throws AMQException, IllegalArgumentException { if (index > _contentBodies.size() - 1) { @@ -80,7 +80,7 @@ public class InMemoryMessageHandle implements AMQMessageHandle _contentBodies.add(contentBody); } - public BasicPublishBody getPublishBody(Long messageId) throws AMQException + public BasicPublishBody getPublishBody(StoreContext context, Long messageId) throws AMQException { return _publishBody; } @@ -96,10 +96,10 @@ public class InMemoryMessageHandle implements AMQMessageHandle _redelivered = redelivered; } - public boolean isPersistent(Long messageId) throws AMQException + public boolean isPersistent(StoreContext context, Long messageId) throws AMQException { //todo remove literal values to a constant file such as AMQConstants in common - ContentHeaderBody chb = getContentHeaderBody(messageId); + ContentHeaderBody chb = getContentHeaderBody(context, messageId); return chb.properties instanceof BasicContentHeaderProperties && ((BasicContentHeaderProperties) chb.properties).getDeliveryMode() == 2; } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java index 8e270f9772..05841ccfc0 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java @@ -267,9 +267,11 @@ public class SubscriptionImpl implements Subscription if (_acks) { channel.addUnacknowledgedMessage(msg, deliveryTag, consumerTag, queue); + msg.decrementReference(storeContext); } msg.writeDeliver(protocolSession, channel.getChannelId(), deliveryTag, consumerTag); + } } finally diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java b/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java index 161913ef15..670d895950 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java @@ -56,21 +56,21 @@ public class WeakReferenceMessageHandle implements AMQMessageHandle _messageStore = messageStore; } - public ContentHeaderBody getContentHeaderBody(Long messageId) throws AMQException + public ContentHeaderBody getContentHeaderBody(StoreContext context, Long messageId) throws AMQException { ContentHeaderBody chb = (_contentHeaderBody != null ? _contentHeaderBody.get() : null); if (chb == null) { - MessageMetaData mmd = loadMessageMetaData(messageId); + MessageMetaData mmd = loadMessageMetaData(context, messageId); chb = mmd.getContentHeaderBody(); } return chb; } - private MessageMetaData loadMessageMetaData(Long messageId) + private MessageMetaData loadMessageMetaData(StoreContext context, Long messageId) throws AMQException { - MessageMetaData mmd = _messageStore.getMessageMetaData(messageId); + MessageMetaData mmd = _messageStore.getMessageMetaData(context, messageId); populateFromMessageMetaData(mmd); return mmd; } @@ -82,11 +82,11 @@ public class WeakReferenceMessageHandle implements AMQMessageHandle _publishBody = new WeakReference<BasicPublishBody>(mmd.getPublishBody()); } - public int getBodyCount(Long messageId) throws AMQException + public int getBodyCount(StoreContext context, Long messageId) throws AMQException { if (_contentBodies == null) { - MessageMetaData mmd = _messageStore.getMessageMetaData(messageId); + MessageMetaData mmd = _messageStore.getMessageMetaData(context, messageId); int chunkCount = mmd.getContentChunkCount(); _contentBodies = new ArrayList<WeakReference<ContentBody>>(chunkCount); for (int i = 0; i < chunkCount; i++) @@ -97,12 +97,12 @@ public class WeakReferenceMessageHandle implements AMQMessageHandle return _contentBodies.size(); } - public long getBodySize(Long messageId) throws AMQException + public long getBodySize(StoreContext context, Long messageId) throws AMQException { - return getContentHeaderBody(messageId).bodySize; + return getContentHeaderBody(context, messageId).bodySize; } - public ContentBody getContentBody(Long messageId, int index) throws AMQException, IllegalArgumentException + public ContentBody getContentBody(StoreContext context, Long messageId, int index) throws AMQException, IllegalArgumentException { if (index > _contentBodies.size() - 1) { @@ -113,7 +113,7 @@ public class WeakReferenceMessageHandle implements AMQMessageHandle ContentBody cb = wr.get(); if (cb == null) { - cb = _messageStore.getContentBodyChunk(messageId, index); + cb = _messageStore.getContentBodyChunk(context, messageId, index); _contentBodies.set(index, new WeakReference<ContentBody>(cb)); } return cb; @@ -145,12 +145,12 @@ public class WeakReferenceMessageHandle implements AMQMessageHandle _messageStore.storeContentBodyChunk(storeContext, messageId, _contentBodies.size() - 1, contentBody, isLastContentBody); } - public BasicPublishBody getPublishBody(Long messageId) throws AMQException + public BasicPublishBody getPublishBody(StoreContext context, Long messageId) throws AMQException { BasicPublishBody bpb = (_publishBody != null ? _publishBody.get() : null); if (bpb == null) { - MessageMetaData mmd = loadMessageMetaData(messageId); + MessageMetaData mmd = loadMessageMetaData(context, messageId); bpb = mmd.getPublishBody(); } @@ -167,10 +167,10 @@ public class WeakReferenceMessageHandle implements AMQMessageHandle _redelivered = redelivered; } - public boolean isPersistent(Long messageId) throws AMQException + public boolean isPersistent(StoreContext context, Long messageId) throws AMQException { //todo remove literal values to a constant file such as AMQConstants in common - ContentHeaderBody chb = getContentHeaderBody(messageId); + ContentHeaderBody chb = getContentHeaderBody(context, messageId); return chb.properties instanceof BasicContentHeaderProperties && ((BasicContentHeaderProperties) chb.properties).getDeliveryMode() == 2; } diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java index 6c4ad10429..f678cea630 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java +++ b/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java @@ -174,12 +174,12 @@ public class MemoryMessageStore implements MessageStore _metaDataMap.put(messageId, messageMetaData); } - public MessageMetaData getMessageMetaData(Long messageId) throws AMQException + public MessageMetaData getMessageMetaData(StoreContext context,Long messageId) throws AMQException { return _metaDataMap.get(messageId); } - public ContentBody getContentBodyChunk(Long messageId, int index) throws AMQException + public ContentBody getContentBodyChunk(StoreContext context, Long messageId, int index) throws AMQException { List<ContentBody> bodyList = _contentBodyMap.get(messageId); return bodyList.get(index); diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java index d707ece8da..7fa46eb1ca 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java +++ b/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java @@ -84,8 +84,8 @@ public interface MessageStore void storeMessageMetaData(StoreContext context, Long messageId, MessageMetaData messageMetaData) throws AMQException; - MessageMetaData getMessageMetaData(Long messageId) throws AMQException; + MessageMetaData getMessageMetaData(StoreContext context, Long messageId) throws AMQException; - ContentBody getContentBodyChunk(Long messageId, int index) throws AMQException; + ContentBody getContentBodyChunk(StoreContext context, Long messageId, int index) throws AMQException; } diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java b/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java index 55e5067852..2e2f2ba7d6 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java +++ b/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java @@ -20,6 +20,9 @@ */ package org.apache.qpid.server.store; +import org.apache.log4j.Logger; + + /** * A context that the store can use to associate with a transactional context. For example, it could store * some kind of txn id. @@ -28,8 +31,22 @@ package org.apache.qpid.server.store; */ public class StoreContext { + + private static final Logger _logger = Logger.getLogger(StoreContext.class); + + private String _name; private Object _payload; + public StoreContext() + { + _name = super.toString(); + } + + public StoreContext(String name) + { + _name = name; + } + public Object getPayload() { return _payload; @@ -37,6 +54,7 @@ public class StoreContext public void setPayload(Object payload) { + _logger.debug("["+_name+"] Setting payload: " + payload); _payload = payload; } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java b/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java index 7481a96ae4..5c915b5c84 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java +++ b/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java @@ -168,7 +168,7 @@ public class LocalTransactionalContext implements TransactionalContext { if (_log.isDebugEnabled()) { - _log.debug("Starting transaction on message store"); + _log.debug("Starting transaction on message store: " + this); } _messageStore.beginTran(_storeContext); _inTran = true; @@ -179,7 +179,7 @@ public class LocalTransactionalContext implements TransactionalContext { if (_log.isDebugEnabled()) { - _log.debug("Committing transactional context"); + _log.debug("Committing transactional context: " + this); } if (_ackOp != null) { |
