diff options
| author | Martin Ritchie <ritchiem@apache.org> | 2009-02-20 14:50:01 +0000 |
|---|---|---|
| committer | Martin Ritchie <ritchiem@apache.org> | 2009-02-20 14:50:01 +0000 |
| commit | ead7d6b126b0749fbc9b1102e26060c36af4df8b (patch) | |
| tree | 2d6de325bb8d4c64219aa2c9d6393302119a0b6e /java/broker/src/main | |
| parent | e2a5f05f47c57dab2b78253a347d9e33e63720e6 (diff) | |
| download | qpid-python-ead7d6b126b0749fbc9b1102e26060c36af4df8b.tar.gz | |
QPID-1632 - Removal of reference counting and update to tests, TxAckTest was reduced in size as reference counting is now not modified until the transaction completes.
Replaced MessageReferenceCoutingTest with PersistentMessageTest and further tests wil be created when DerbyDBMessageStore is also updated.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@746260 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker/src/main')
19 files changed, 146 insertions, 232 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index 125518358b..5a01888ccf 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -499,7 +499,7 @@ public class AMQChannel } else { - unacked.discard(_storeContext); + unacked.dequeueAndDelete(_storeContext); } } @@ -555,7 +555,7 @@ public class AMQChannel _log.warn(System.identityHashCode(this) + " Requested requeue of message(" + unacked.getMessage().debugIdentity() + "):" + deliveryTag + " but no queue defined and no DeadLetter queue so DROPPING message."); - unacked.discard(_storeContext); + unacked.dequeueAndDelete(_storeContext); } } else @@ -712,7 +712,7 @@ public class AMQChannel { try { - message.discard(_storeContext); + message.dequeueAndDelete(_storeContext); message.setQueueDeleted(true); } @@ -831,9 +831,7 @@ public class AMQChannel { AMQMessage message = bouncedMessage.getAMQMessage(); _session.getProtocolOutputConverter().writeReturn(message, _channelId, bouncedMessage.getReplyCode().getCode(), - new AMQShortString(bouncedMessage.getMessage())); - - message.decrementReference(_storeContext); + new AMQShortString(bouncedMessage.getMessage())); } _returnMessages.clear(); diff --git a/java/broker/src/main/java/org/apache/qpid/server/ExtractResendAndRequeue.java b/java/broker/src/main/java/org/apache/qpid/server/ExtractResendAndRequeue.java index 1723d46ef0..8d41cc58d2 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/ExtractResendAndRequeue.java +++ b/java/broker/src/main/java/org/apache/qpid/server/ExtractResendAndRequeue.java @@ -82,13 +82,13 @@ public class ExtractResendAndRequeue implements UnacknowledgedMessageMap.Visitor } else { - queueEntry.discard(_storeContext); + queueEntry.dequeueAndDelete(_storeContext); _log.info("No DeadLetter Queue and requeue not requested so dropping message:" + queueEntry); } } else { - queueEntry.discard(_storeContext); + queueEntry.dequeueAndDelete(_storeContext); _log.warn("Message.queue is null and no DeadLetter Queue so dropping message:" + queueEntry); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java b/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java index 415f1fe8be..a81b2cc2db 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java +++ b/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java @@ -56,14 +56,10 @@ public abstract class RequiredDeliveryException extends AMQException public void setMessage(final AMQMessage payload) { - - // Increment the reference as this message is in the routing phase - // and so will have the ref decremented as routing fails. // we need to keep this message around so we can return it in the - // handler. So increment here. - payload.incrementReference(1); + // handler. + // Messages are all kept in memory now. Only queues can push messages out of memory. _amqMessage = payload; - } public AMQMessage getAMQMessage() diff --git a/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java b/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java index 0f40e00624..918fcd8407 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java +++ b/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java @@ -20,7 +20,6 @@ */ package org.apache.qpid.server.ack; -import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.HashMap; @@ -116,22 +115,20 @@ public class TxAck implements TxnOp //make persistent changes, i.e. dequeue and decrementReference for (QueueEntry msg : _unacked.values()) { - //Message has been ack so discard it. This will dequeue and decrement the reference. - msg.discard(storeContext); - + //Message has been ack so dequeueAndDelete it. + // If the message is persistent and this is the last QueueEntry that uses it then the data will be removed + // from the transaciton log + msg.dequeueAndDelete(storeContext); } } public void undoPrepare() { - //decrementReference is annoyingly untransactional (due to - //in memory counter) so if we failed in prepare for full - //txn, this op will have to compensate by fixing the count - //in memory (persistent changes will be rolled back by store) - for (QueueEntry msg : _unacked.values()) - { - msg.getMessage().incrementReference(1); - } + //As this is transaction the above dequeueAndDelete will only request the message be dequeue from the + // transactionLog. Only when the transaction succesfully completes will it perform any + // update of the internal transactionLog reference counting and any resulting message data deletion. + // The success or failure of the data deletion is not important to this transaction only that the ack has been + // successfully recorded. } public void commit(StoreContext storeContext) diff --git a/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java b/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java index efdadd4922..ac3b0b5e49 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java +++ b/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java @@ -174,8 +174,10 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap " When deliveryTag is:" + deliveryTag + "ES:" + _map.entrySet().toString()); } - //Message has been ack so discard it. This will dequeue and decrement the reference. - unacked.getValue().discard(storeContext); + //Message has been ack so dequeueAndDelete it. + // If the message is persistent and this is the last QueueEntry that uses it then the data will be removed + // from the transaciton log + unacked.getValue().dequeueAndDelete(storeContext); it.remove(); diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java index f3cab10ed7..bd70cd7776 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java @@ -65,38 +65,38 @@ public class BasicRejectMethodHandler implements StateAwareMethodListener<BasicR long deliveryTag = body.getDeliveryTag(); - QueueEntry message = channel.getUnacknowledgedMessageMap().get(deliveryTag); + QueueEntry queueEntry = channel.getUnacknowledgedMessageMap().get(deliveryTag); - if (message == null) + if (queueEntry == null) { _logger.warn("Dropping reject request as message is null for tag:" + deliveryTag); // throw evt.getMethod().getChannelException(AMQConstant.NOT_FOUND, "Delivery Tag(" + deliveryTag + ")not known"); } else { - if (message.isQueueDeleted()) + if (queueEntry.isQueueDeleted()) { _logger.warn("Message's Queue as already been purged, unable to Reject. " + "Dropping message should use Dead Letter Queue"); - message = channel.getUnacknowledgedMessageMap().remove(deliveryTag); - if(message != null) + queueEntry = channel.getUnacknowledgedMessageMap().remove(deliveryTag); + if(queueEntry != null) { - message.discard(channel.getStoreContext()); + queueEntry.dequeueAndDelete(channel.getStoreContext()); } //sendtoDeadLetterQueue(msg) return; } - if (!message.getMessage().isReferenced()) + if (queueEntry.isDeleted()) { - _logger.warn("Message as already been purged, unable to Reject."); + _logger.warn("QueueEntry as already been deleted, unable to Reject."); return; } if (_logger.isDebugEnabled()) { - _logger.debug("Rejecting: DT:" + deliveryTag + "-" + message.getMessage().debugIdentity() + + _logger.debug("Rejecting: DT:" + deliveryTag + "-" + queueEntry.getMessage().debugIdentity() + ": Requeue:" + body.getRequeue() + //": Resend:" + evt.getMethod().resend + " on channel:" + channel.debugIdentity()); @@ -105,7 +105,7 @@ public class BasicRejectMethodHandler implements StateAwareMethodListener<BasicR // If we haven't requested message to be resent to this consumer then reject it from ever getting it. //if (!evt.getMethod().resend) { - message.reject(); + queueEntry.reject(); } if (body.getRequeue()) @@ -115,7 +115,7 @@ public class BasicRejectMethodHandler implements StateAwareMethodListener<BasicR else { _logger.warn("Dropping message as requeue not required and there is no dead letter queue"); - message = channel.getUnacknowledgedMessageMap().remove(deliveryTag); + queueEntry = channel.getUnacknowledgedMessageMap().remove(deliveryTag); //sendtoDeadLetterQueue(AMQMessage message) // message.queue = channel.getDefaultDeadLetterQueue(); // channel.requeue(deliveryTag); diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java index 1f56b2ccd2..e96d2ba874 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java @@ -114,17 +114,8 @@ public interface AMQMessage throws AMQException; - void removeMessage(StoreContext storeContext) throws AMQException; String toString(); String debugIdentity(); - - // Reference counting methods - - void decrementReference(StoreContext storeContext) throws MessageCleanupException; - - boolean incrementReference(int queueCount); - - boolean isReferenced(); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java index 0838b71c54..9fadbb0cdc 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java @@ -81,25 +81,18 @@ public interface AMQQueue extends Managable, Comparable<AMQQueue> boolean isDeleted(); - int delete() throws AMQException; - QueueEntry enqueue(StoreContext storeContext, AMQMessage message) throws AMQException; void requeue(StoreContext storeContext, QueueEntry entry) throws AMQException; void dequeue(StoreContext storeContext, QueueEntry entry) throws FailedDequeueException; - - boolean resend(final QueueEntry entry, final Subscription subscription) throws AMQException; - - void addQueueDeleteTask(final Task task); - List<QueueEntry> getMessagesOnTheQueue(); List<QueueEntry> getMessagesOnTheQueue(long fromMessageId, long toMessageId); diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java index 5d4322c4fc..5eafd281c0 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java @@ -156,8 +156,7 @@ public class IncomingMessage implements Filterable<RuntimeException> _logger.debug("Delivering message " + getMessageId() + " to " + _destinationQueues); } - try - { + // first we allow the handle to know that the message has been fully received. This is useful if it is // maintaining any calculated values based on content chunks _message.setPublishAndContentHeaderBody(_txnContext.getStoreContext(), _messagePublishInfo, getContentHeaderBody()); @@ -196,7 +195,6 @@ public class IncomingMessage implements Filterable<RuntimeException> { int offset; final int queueCount = _destinationQueues.size(); - _message.incrementReference(queueCount); if(queueCount == 1) { offset = 0; @@ -222,12 +220,8 @@ public class IncomingMessage implements Filterable<RuntimeException> } return _message; - } - finally - { - // Remove refence for routing process . Reference count should now == delivered queue count - if(_message != null) _message.decrementReference(_txnContext.getStoreContext()); - } + + } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/PersistentAMQMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/PersistentAMQMessage.java index ec48a2afb0..92c10b0347 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/PersistentAMQMessage.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/PersistentAMQMessage.java @@ -58,13 +58,6 @@ public class PersistentAMQMessage extends TransientAMQMessage } @Override - public void removeMessage(StoreContext storeContext) throws AMQException - { - _log.info("PAMQM : removing message:" + _messageId); - _transactionLog.removeMessage(storeContext, _messageId); - } - - @Override public boolean isPersistent() { return true; diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java index 0df976a620..09600b9d28 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java @@ -49,7 +49,6 @@ public interface QueueEntry extends Comparable<QueueEntry>, Filterable<AMQExcept public abstract State getState(); } - public final class AvailableState extends EntryState { @@ -59,7 +58,6 @@ public interface QueueEntry extends Comparable<QueueEntry>, Filterable<AMQExcept } } - public final class DequeuedState extends EntryState { @@ -69,7 +67,6 @@ public interface QueueEntry extends Comparable<QueueEntry>, Filterable<AMQExcept } } - public final class DeletedState extends EntryState { @@ -88,7 +85,6 @@ public interface QueueEntry extends Comparable<QueueEntry>, Filterable<AMQExcept } } - public final class NonSubscriptionAcquiredState extends EntryState { public State getState() @@ -106,7 +102,6 @@ public interface QueueEntry extends Comparable<QueueEntry>, Filterable<AMQExcept _subscription = subscription; } - public State getState() { return State.ACQUIRED; @@ -118,16 +113,12 @@ public interface QueueEntry extends Comparable<QueueEntry>, Filterable<AMQExcept } } - final static EntryState AVAILABLE_STATE = new AvailableState(); final static EntryState DELETED_STATE = new DeletedState(); final static EntryState DEQUEUED_STATE = new DequeuedState(); final static EntryState EXPIRED_STATE = new ExpiredState(); final static EntryState NON_SUBSCRIPTION_ACQUIRED_STATE = new NonSubscriptionAcquiredState(); - - - AMQQueue getQueue(); AMQMessage getMessage(); @@ -141,9 +132,11 @@ public interface QueueEntry extends Comparable<QueueEntry>, Filterable<AMQExcept boolean isAcquired(); boolean acquire(); + boolean acquire(Subscription sub); boolean delete(); + boolean isDeleted(); boolean acquiredBySubscription(); @@ -170,12 +163,21 @@ public interface QueueEntry extends Comparable<QueueEntry>, Filterable<AMQExcept void dequeue(final StoreContext storeContext) throws FailedDequeueException; - void dispose(final StoreContext storeContext) throws MessageCleanupException; - - void discard(StoreContext storeContext) throws FailedDequeueException, MessageCleanupException; + /** + * Message has been ack so dequeueAndDelete it. + * If the message is persistent and this is the last QueueEntry that uses it then the data will be removed + * from the transaciton log + * + * @param storeContext the transactional Context in which to perform the deletion + * + * @throws FailedDequeueException + * @throws MessageCleanupException + */ + void dequeueAndDelete(StoreContext storeContext) throws FailedDequeueException; boolean isQueueDeleted(); void addStateChangeListener(StateChangeListener listener); + boolean removeStateChangeListener(StateChangeListener listener); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java index 3eb1636884..911ed8321b 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java @@ -282,13 +282,12 @@ public class QueueEntryImpl implements QueueEntry } getQueue().dequeue(storeContext, this); - if(_stateChangeListeners != null) + + if (_stateChangeListeners != null) { - notifyStateChange(state.getState() , QueueEntry.State.DEQUEUED); + notifyStateChange(state.getState(), QueueEntry.State.DEQUEUED); } - } - } private void notifyStateChange(final State oldState, final State newState) @@ -299,29 +298,15 @@ public class QueueEntryImpl implements QueueEntry } } - public void dispose(final StoreContext storeContext) throws MessageCleanupException - { - _log.info("QEI Disposing of message:" + getMessage().getMessageId() + ": state=" + _state); - if(delete()) - { - _log.info("QEI delete message:" + getMessage().getMessageId()); - getMessage().decrementReference(storeContext); - } - else - { - _log.info("QEI delete state wrong:" + getMessage().getMessageId()); - } - } - - public void discard(StoreContext storeContext) throws FailedDequeueException, MessageCleanupException + public void dequeueAndDelete(StoreContext storeContext) throws FailedDequeueException { - //if the queue is null then the message is waiting to be acked, but has been removed. + //if the queue is null (i.e. queue.delete()'d) then the message is waiting to be acked, but has already be delete()'d; if (getQueue() != null) { dequeue(storeContext); } - dispose(storeContext); + delete(); } public boolean isQueueDeleted() diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java index a0f21033c7..501e90b4d7 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java @@ -408,8 +408,11 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener if (entry.immediateAndNotDelivered()) { - dequeue(storeContext, entry); - entry.dispose(storeContext); + //We acquire the message here to ensure that the dequeueAndDelete will correctly remove the content + // from the transactionLog. This saves us from having to have a custom dequeueAndDelete that checks + // for the AVAILABLE state of an entry rather than the ACQUIRED state that it currently uses. + entry.acquire(); + entry.dequeueAndDelete(storeContext); } else if (!(entry.isAcquired() || entry.isDeleted())) { @@ -562,6 +565,12 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener } + /** + * Only call from queue Entry + * @param storeContext + * @param entry + * @throws FailedDequeueException + */ public void dequeue(StoreContext storeContext, QueueEntry entry) throws FailedDequeueException { decrementQueueCount(); @@ -578,7 +587,6 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener { _virtualHost.getTransactionLog().dequeueMessage(storeContext, this, msg.getMessageId()); } - //entry.dispose(storeContext); } catch (MessageCleanupException e) @@ -814,11 +822,18 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener } + public void moveMessagesToAnotherQueue(final long fromMessageId, final long toMessageId, String queueName, StoreContext storeContext) { + // The move is a two step process. First the messages are moved in the _transactionLog. + // That is persistent messages are moved queues on disk for recovery and the QueueEntries removed from the + // existing queue. + // This is done as Queue.enqueue() does not write the data to the transactionLog. In normal message delivery + // this is done as the message is recieved. + // So The final step is to enqueue the messages on the new queue. AMQQueue toQueue = getVirtualHost().getQueueRegistry().getQueue(new AMQShortString(queueName)); TransactionLog transactionLog = getVirtualHost().getTransactionLog(); @@ -844,7 +859,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener { transactionLog.beginTran(storeContext); - // Move the messages in on the transaction log. + // Move the messages in the transaction log. for (QueueEntry entry : entries) { AMQMessage message = entry.getMessage(); @@ -853,7 +868,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener { transactionLog.enqueueMessage(storeContext, toQueue, message.getMessageId()); } - // dequeue does not decrement the refence count + // dequeue will remove the messages from the queue entry.dequeue(storeContext); } @@ -882,10 +897,10 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener try { + // Add messages to new queue for (QueueEntry entry : entries) { toQueue.enqueue(storeContext, entry.getMessage()); - } } catch (MessageCleanupException e) @@ -918,7 +933,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener { if (!entry.isDeleted()) { - return entry.getMessage().incrementReference(1); + return true; } } @@ -940,7 +955,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener { AMQMessage message = entry.getMessage(); - if (message.isReferenced() && message.isPersistent() && toQueue.isDurable()) + if (!entry.isDeleted() && message.isPersistent() && toQueue.isDurable()) { transactionLog.enqueueMessage(storeContext, toQueue, message.getMessageId()); } @@ -973,7 +988,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener { for (QueueEntry entry : entries) { - if (entry.getMessage().isReferenced()) + if (!entry.isDeleted()) { toQueue.enqueue(storeContext, entry.getMessage()); } @@ -1008,7 +1023,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener && !node.isDeleted() && node.acquire()) { - node.discard(storeContext); + node.dequeueAndDelete(storeContext); } } @@ -1032,7 +1047,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener QueueEntry node = queueListIterator.getNode(); if (!node.isDeleted() && node.acquire()) { - node.discard(storeContext); + node.dequeueAndDelete(storeContext); noDeletes = false; } @@ -1050,7 +1065,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener QueueEntry node = queueListIterator.getNode(); if (!node.isDeleted() && node.acquire()) { - node.discard(storeContext); + node.dequeueAndDelete(storeContext); count++; } @@ -1315,8 +1330,9 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener { if (node.acquire()) { + // creating a new final store context per message seems wasteful. final StoreContext reapingStoreContext = new StoreContext(); - node.discard(reapingStoreContext); + node.dequeueAndDelete(reapingStoreContext); } } QueueEntry newNode = _entries.next(node); @@ -1431,7 +1447,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener QueueEntry node = queueListIterator.getNode(); if (!node.isDeleted() && node.expired() && node.acquire()) { - node.discard(storeContext); + node.dequeueAndDelete(storeContext); } else { diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/TransientAMQMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/TransientAMQMessage.java index f3d74fb01c..fa4e85a043 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/TransientAMQMessage.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/TransientAMQMessage.java @@ -172,11 +172,6 @@ public class TransientAMQMessage implements AMQMessage _expiration = expiration; } - public boolean isReferenced() - { - return _referenceCount.get() > 0; - } - public Iterator<AMQDataBlock> getBodyFrameIterator(AMQProtocolSession protocolSession, int channel) { return new BodyFrameIterator(protocolSession, channel); @@ -197,76 +192,6 @@ public class TransientAMQMessage implements AMQMessage return _messageId; } - /* Threadsafe. Increment the reference count on the message. */ - public boolean incrementReference(int count) - { - if (_referenceCount.addAndGet(count) <= 1) - { - int newcount = _referenceCount.addAndGet(-count); - _log.debug("Message(" + _messageId + ") Incremented Ref count by (" + count + ") to :" + newcount); - return false; - } - else - { - _log.debug("Message(" + _messageId + ") Incremented Ref count by (" + count + ") but count was <=1(" - + _referenceCount.get() + ")"); - return true; - } - - } - - /** - * Threadsafe. This will decrement the reference count and when it reaches zero will remove the message from the - * message store. - * - * @param storeContext - * - * @throws MessageCleanupException when an attempt was made to remove the message from the message store and that - * failed - */ - public void decrementReference(StoreContext storeContext) throws MessageCleanupException - { - - int count = _referenceCount.decrementAndGet(); - - _log.debug("Message(" + _messageId + ") Decremented Ref count to :" + count); - - // note that the operation of decrementing the reference count and then removing the message does not - // have to be atomic since the ref count starts at 1 and the exchange itself decrements that after - // the message has been passed to all queues. i.e. we are - // not relying on the all the increments having taken place before the delivery manager decrements. - if (count == 0) - { - // set the reference count way below 0 so that we can detect that the message has been deleted - // this is to guard against the message being spontaneously recreated (from the mgmt console) - // by copying from other queues at the same time as it is being removed. - _referenceCount.set(Integer.MIN_VALUE / 2); - - try - { - _log.debug("Reference Count hit 0, removing message"); - // must check if the handle is null since there may be cases where we decide to throw away a message - // and the handle has not yet been constructed - // no need to perform persistent check anymore as TransientAMQM.removeMessage() is a no-op - removeMessage(storeContext); - } - catch (AMQException e) - { - // to maintain consistency, we revert the count - incrementReference(1); - throw new MessageCleanupException(getMessageId(), e); - } - } - else - { - if (count < 0) - { - throw new MessageCleanupException("Reference count for message id " + debugIdentity() - + " has gone below 0."); - } - } - } - /** * Called selectors to determin if the message has already been sent * @@ -435,11 +360,6 @@ public class TransientAMQMessage implements AMQMessage return _arrivalTime; } - public void removeMessage(StoreContext storeContext) throws AMQException - { - //no-op - } - public String toString() { // return "Message[" + debugIdentity() + "]: " + _messageId + "; ref count: " + _referenceCount + "; taken : " + diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java index fe81346c8c..33b3d8608e 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java +++ b/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java @@ -1357,7 +1357,8 @@ public class DerbyMessageStore implements TransactionLog, RoutingTable if(message != null) { - message.incrementReference(1); + //todo must enqueue message to build reference table +// message.incrementReference(1); } else { diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java index aa7b6e2542..cd0f0c1769 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java +++ b/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java @@ -30,24 +30,28 @@ import org.apache.qpid.server.queue.MessageMetaData; import org.apache.qpid.server.configuration.VirtualHostConfiguration; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.virtualhost.VirtualHost; -import org.apache.qpid.server.transactionlog.TransactionLog; +import org.apache.qpid.server.queue.MessageMetaData; import org.apache.qpid.server.routing.RoutingTable; +import org.apache.qpid.server.transactionlog.TransactionLog; +import org.apache.qpid.server.virtualhost.VirtualHost; import java.util.ArrayList; import java.util.Collections; +import java.util.LinkedList; import java.util.List; +import java.util.Map; +import java.util.HashMap; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; -/** A simple message store that stores the messages in a threadsafe structure in memory. +/** + * A simple message store that stores the messages in a threadsafe structure in memory. * * NOTE: Now that we have removed the MessageStore interface and are using a TransactionLog * * This class really should have no storage unless we want to do inMemory Recovery. - * */ public class MemoryMessageStore implements TransactionLog, RoutingTable { @@ -63,6 +67,7 @@ public class MemoryMessageStore implements TransactionLog, RoutingTable private final AtomicLong _messageId = new AtomicLong(1); private AtomicBoolean _closed = new AtomicBoolean(false); + protected final Map<Long, List<AMQQueue>> _messageEnqueueMap = new HashMap<Long, List<AMQQueue>>(); public void configure() { @@ -112,6 +117,7 @@ public class MemoryMessageStore implements TransactionLog, RoutingTable } _metaDataMap.remove(messageId); _contentBodyMap.remove(messageId); + _messageEnqueueMap.remove(messageId); } public void createExchange(Exchange exchange) throws AMQException @@ -134,7 +140,6 @@ public class MemoryMessageStore implements TransactionLog, RoutingTable } - public void createQueue(AMQQueue queue) throws AMQException { // Not requred to do anything @@ -152,12 +157,39 @@ public class MemoryMessageStore implements TransactionLog, RoutingTable public void enqueueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException { - // Not required to do anything + synchronized (_messageEnqueueMap) + { + List<AMQQueue> queues = _messageEnqueueMap.get(messageId); + if (queues == null) + { + queues = new LinkedList<AMQQueue>(); + _messageEnqueueMap.put(messageId, queues); + } + + queues.add(queue); + } } public void dequeueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException { - // Not required to do anything + synchronized (_messageEnqueueMap) + { + List<AMQQueue> queues = _messageEnqueueMap.get(messageId); + if (queues == null || !queues.contains(queue)) + { + throw new RuntimeException("Attempt to dequeue messageID:" + messageId + " from queue:" + queue.getName() + + " but it is not enqueued on that queue."); + } + else + { + queues.remove(queue); + if (queues.isEmpty()) + { + removeMessage(context,messageId); + } + } + } + } public void beginTran(StoreContext context) throws AMQException @@ -238,7 +270,7 @@ public class MemoryMessageStore implements TransactionLog, RoutingTable } private void checkNotClosed() throws MessageStoreClosedException - { + { if (_closed.get()) { throw new MessageStoreClosedException(); diff --git a/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java b/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java index 1c58e644e9..119a4b1692 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java +++ b/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java @@ -144,7 +144,8 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage StoreContext storeContext = getChannel().getStoreContext(); try - { // if we do not need to wait for client acknowledgements + { + // if we do not need to wait for client acknowledgements // we can decrement the reference count immediately. // By doing this _before_ the send we ensure that it @@ -153,7 +154,7 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage // The send may of course still fail, in which case, as // the message is unacked, it will be lost. - entry.dequeue(storeContext); + entry.dequeueAndDelete(storeContext); synchronized (getChannel()) @@ -163,7 +164,6 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage sendToClient(entry, deliveryTag); } - entry.dispose(storeContext); } finally { @@ -316,7 +316,7 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage _autoClose = false; } - + _logger.info(debugIdentity()+" Created subscription:"); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java b/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java index 8e63b95f0d..abfb60c5bf 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java +++ b/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java @@ -92,20 +92,12 @@ public class LocalTransactionalContext implements TransactionalContext public void process() throws AMQException { - _message.incrementReference(1); - try - { QueueEntry entry = _queue.enqueue(getStoreContext(),_message); if(entry.immediateAndNotDelivered()) { getReturnMessages().add(new NoConsumersException(_message)); } - } - finally - { - _message.decrementReference(getStoreContext()); - } } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java b/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java index 145d7f8b13..561f998b98 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java +++ b/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java @@ -123,18 +123,18 @@ public class NonTransactionalContext implements TransactionalContext unacknowledgedMessageMap.size()); unacknowledgedMessageMap.visit(new UnacknowledgedMessageMap.Visitor() { - public boolean callback(final long deliveryTag, QueueEntry message) throws AMQException + public boolean callback(final long deliveryTag, QueueEntry queueEntry) throws AMQException { if (debug) { - _log.debug("Discarding message: " + message.getMessage().getMessageId()); + _log.debug("Discarding message: " + queueEntry.getMessage().getMessageId()); } - if(message.getMessage().isPersistent()) + if(queueEntry.getMessage().isPersistent()) { beginTranIfNecessary(); } - //Message has been ack so discard it. This will dequeue and decrement the reference. - message.discard(_storeContext); + //Message has been ack so dequeueAndDelete it. + queueEntry.dequeueAndDelete(_storeContext); return false; } @@ -157,10 +157,10 @@ public class NonTransactionalContext implements TransactionalContext } else { - QueueEntry msg; - msg = unacknowledgedMessageMap.get(deliveryTag); + QueueEntry queueEntry; + queueEntry = unacknowledgedMessageMap.get(deliveryTag); - if (msg == null) + if (queueEntry == null) { _log.info("Single ack on delivery tag " + deliveryTag + " not known for channel:" + _channel.getChannelId()); @@ -170,15 +170,17 @@ public class NonTransactionalContext implements TransactionalContext if (debug) { - _log.debug("Discarding message: " + msg.getMessage().getMessageId()); + _log.debug("Discarding message: " + queueEntry.getMessage().getMessageId()); } - if(msg.getMessage().isPersistent()) + if(queueEntry.getMessage().isPersistent()) { beginTranIfNecessary(); } - //Message has been ack so discard it. This will dequeue and decrement the reference. - msg.discard(_storeContext); + //Message has been ack so dequeueAndDelete it. + // If the message is persistent and this is the last QueueEntry that uses it then the data will be removed + // from the transaciton log + queueEntry.dequeueAndDelete(_storeContext); unacknowledgedMessageMap.remove(deliveryTag); @@ -186,7 +188,7 @@ public class NonTransactionalContext implements TransactionalContext if (debug) { _log.debug("Received non-multiple ack for messaging with delivery tag " + deliveryTag + " msg id " + - msg.getMessage().getMessageId()); + queueEntry.getMessage().getMessageId()); } } if(_inTran) |
