diff options
Diffstat (limited to 'java/broker/src/main')
19 files changed, 146 insertions, 232 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index 125518358b..5a01888ccf 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -499,7 +499,7 @@ public class AMQChannel } else { - unacked.discard(_storeContext); + unacked.dequeueAndDelete(_storeContext); } } @@ -555,7 +555,7 @@ public class AMQChannel _log.warn(System.identityHashCode(this) + " Requested requeue of message(" + unacked.getMessage().debugIdentity() + "):" + deliveryTag + " but no queue defined and no DeadLetter queue so DROPPING message."); - unacked.discard(_storeContext); + unacked.dequeueAndDelete(_storeContext); } } else @@ -712,7 +712,7 @@ public class AMQChannel { try { - message.discard(_storeContext); + message.dequeueAndDelete(_storeContext); message.setQueueDeleted(true); } @@ -831,9 +831,7 @@ public class AMQChannel { AMQMessage message = bouncedMessage.getAMQMessage(); _session.getProtocolOutputConverter().writeReturn(message, _channelId, bouncedMessage.getReplyCode().getCode(), - new AMQShortString(bouncedMessage.getMessage())); - - message.decrementReference(_storeContext); + new AMQShortString(bouncedMessage.getMessage())); } _returnMessages.clear(); diff --git a/java/broker/src/main/java/org/apache/qpid/server/ExtractResendAndRequeue.java b/java/broker/src/main/java/org/apache/qpid/server/ExtractResendAndRequeue.java index 1723d46ef0..8d41cc58d2 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/ExtractResendAndRequeue.java +++ b/java/broker/src/main/java/org/apache/qpid/server/ExtractResendAndRequeue.java @@ -82,13 +82,13 @@ public class ExtractResendAndRequeue implements UnacknowledgedMessageMap.Visitor } else { - queueEntry.discard(_storeContext); + queueEntry.dequeueAndDelete(_storeContext); _log.info("No DeadLetter Queue and requeue not requested so dropping message:" + queueEntry); } } else { - queueEntry.discard(_storeContext); + queueEntry.dequeueAndDelete(_storeContext); _log.warn("Message.queue is null and no DeadLetter Queue so dropping message:" + queueEntry); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java b/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java index 415f1fe8be..a81b2cc2db 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java +++ b/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java @@ -56,14 +56,10 @@ public abstract class RequiredDeliveryException extends AMQException public void setMessage(final AMQMessage payload) { - - // Increment the reference as this message is in the routing phase - // and so will have the ref decremented as routing fails. // we need to keep this message around so we can return it in the - // handler. So increment here. - payload.incrementReference(1); + // handler. + // Messages are all kept in memory now. Only queues can push messages out of memory. _amqMessage = payload; - } public AMQMessage getAMQMessage() diff --git a/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java b/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java index 0f40e00624..918fcd8407 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java +++ b/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java @@ -20,7 +20,6 @@ */ package org.apache.qpid.server.ack; -import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.HashMap; @@ -116,22 +115,20 @@ public class TxAck implements TxnOp //make persistent changes, i.e. dequeue and decrementReference for (QueueEntry msg : _unacked.values()) { - //Message has been ack so discard it. This will dequeue and decrement the reference. - msg.discard(storeContext); - + //Message has been ack so dequeueAndDelete it. + // If the message is persistent and this is the last QueueEntry that uses it then the data will be removed + // from the transaciton log + msg.dequeueAndDelete(storeContext); } } public void undoPrepare() { - //decrementReference is annoyingly untransactional (due to - //in memory counter) so if we failed in prepare for full - //txn, this op will have to compensate by fixing the count - //in memory (persistent changes will be rolled back by store) - for (QueueEntry msg : _unacked.values()) - { - msg.getMessage().incrementReference(1); - } + //As this is transaction the above dequeueAndDelete will only request the message be dequeue from the + // transactionLog. Only when the transaction succesfully completes will it perform any + // update of the internal transactionLog reference counting and any resulting message data deletion. + // The success or failure of the data deletion is not important to this transaction only that the ack has been + // successfully recorded. } public void commit(StoreContext storeContext) diff --git a/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java b/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java index efdadd4922..ac3b0b5e49 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java +++ b/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java @@ -174,8 +174,10 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap " When deliveryTag is:" + deliveryTag + "ES:" + _map.entrySet().toString()); } - //Message has been ack so discard it. This will dequeue and decrement the reference. - unacked.getValue().discard(storeContext); + //Message has been ack so dequeueAndDelete it. + // If the message is persistent and this is the last QueueEntry that uses it then the data will be removed + // from the transaciton log + unacked.getValue().dequeueAndDelete(storeContext); it.remove(); diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java index f3cab10ed7..bd70cd7776 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java @@ -65,38 +65,38 @@ public class BasicRejectMethodHandler implements StateAwareMethodListener<BasicR long deliveryTag = body.getDeliveryTag(); - QueueEntry message = channel.getUnacknowledgedMessageMap().get(deliveryTag); + QueueEntry queueEntry = channel.getUnacknowledgedMessageMap().get(deliveryTag); - if (message == null) + if (queueEntry == null) { _logger.warn("Dropping reject request as message is null for tag:" + deliveryTag); // throw evt.getMethod().getChannelException(AMQConstant.NOT_FOUND, "Delivery Tag(" + deliveryTag + ")not known"); } else { - if (message.isQueueDeleted()) + if (queueEntry.isQueueDeleted()) { _logger.warn("Message's Queue as already been purged, unable to Reject. " + "Dropping message should use Dead Letter Queue"); - message = channel.getUnacknowledgedMessageMap().remove(deliveryTag); - if(message != null) + queueEntry = channel.getUnacknowledgedMessageMap().remove(deliveryTag); + if(queueEntry != null) { - message.discard(channel.getStoreContext()); + queueEntry.dequeueAndDelete(channel.getStoreContext()); } //sendtoDeadLetterQueue(msg) return; } - if (!message.getMessage().isReferenced()) + if (queueEntry.isDeleted()) { - _logger.warn("Message as already been purged, unable to Reject."); + _logger.warn("QueueEntry as already been deleted, unable to Reject."); return; } if (_logger.isDebugEnabled()) { - _logger.debug("Rejecting: DT:" + deliveryTag + "-" + message.getMessage().debugIdentity() + + _logger.debug("Rejecting: DT:" + deliveryTag + "-" + queueEntry.getMessage().debugIdentity() + ": Requeue:" + body.getRequeue() + //": Resend:" + evt.getMethod().resend + " on channel:" + channel.debugIdentity()); @@ -105,7 +105,7 @@ public class BasicRejectMethodHandler implements StateAwareMethodListener<BasicR // If we haven't requested message to be resent to this consumer then reject it from ever getting it. //if (!evt.getMethod().resend) { - message.reject(); + queueEntry.reject(); } if (body.getRequeue()) @@ -115,7 +115,7 @@ public class BasicRejectMethodHandler implements StateAwareMethodListener<BasicR else { _logger.warn("Dropping message as requeue not required and there is no dead letter queue"); - message = channel.getUnacknowledgedMessageMap().remove(deliveryTag); + queueEntry = channel.getUnacknowledgedMessageMap().remove(deliveryTag); //sendtoDeadLetterQueue(AMQMessage message) // message.queue = channel.getDefaultDeadLetterQueue(); // channel.requeue(deliveryTag); diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java index 1f56b2ccd2..e96d2ba874 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java @@ -114,17 +114,8 @@ public interface AMQMessage throws AMQException; - void removeMessage(StoreContext storeContext) throws AMQException; String toString(); String debugIdentity(); - - // Reference counting methods - - void decrementReference(StoreContext storeContext) throws MessageCleanupException; - - boolean incrementReference(int queueCount); - - boolean isReferenced(); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java index 0838b71c54..9fadbb0cdc 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java @@ -81,25 +81,18 @@ public interface AMQQueue extends Managable, Comparable<AMQQueue> boolean isDeleted(); - int delete() throws AMQException; - QueueEntry enqueue(StoreContext storeContext, AMQMessage message) throws AMQException; void requeue(StoreContext storeContext, QueueEntry entry) throws AMQException; void dequeue(StoreContext storeContext, QueueEntry entry) throws FailedDequeueException; - - boolean resend(final QueueEntry entry, final Subscription subscription) throws AMQException; - - void addQueueDeleteTask(final Task task); - List<QueueEntry> getMessagesOnTheQueue(); List<QueueEntry> getMessagesOnTheQueue(long fromMessageId, long toMessageId); diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java index 5d4322c4fc..5eafd281c0 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java @@ -156,8 +156,7 @@ public class IncomingMessage implements Filterable<RuntimeException> _logger.debug("Delivering message " + getMessageId() + " to " + _destinationQueues); } - try - { + // first we allow the handle to know that the message has been fully received. This is useful if it is // maintaining any calculated values based on content chunks _message.setPublishAndContentHeaderBody(_txnContext.getStoreContext(), _messagePublishInfo, getContentHeaderBody()); @@ -196,7 +195,6 @@ public class IncomingMessage implements Filterable<RuntimeException> { int offset; final int queueCount = _destinationQueues.size(); - _message.incrementReference(queueCount); if(queueCount == 1) { offset = 0; @@ -222,12 +220,8 @@ public class IncomingMessage implements Filterable<RuntimeException> } return _message; - } - finally - { - // Remove refence for routing process . Reference count should now == delivered queue count - if(_message != null) _message.decrementReference(_txnContext.getStoreContext()); - } + + } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/PersistentAMQMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/PersistentAMQMessage.java index ec48a2afb0..92c10b0347 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/PersistentAMQMessage.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/PersistentAMQMessage.java @@ -58,13 +58,6 @@ public class PersistentAMQMessage extends TransientAMQMessage } @Override - public void removeMessage(StoreContext storeContext) throws AMQException - { - _log.info("PAMQM : removing message:" + _messageId); - _transactionLog.removeMessage(storeContext, _messageId); - } - - @Override public boolean isPersistent() { return true; diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java index 0df976a620..09600b9d28 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java @@ -49,7 +49,6 @@ public interface QueueEntry extends Comparable<QueueEntry>, Filterable<AMQExcept public abstract State getState(); } - public final class AvailableState extends EntryState { @@ -59,7 +58,6 @@ public interface QueueEntry extends Comparable<QueueEntry>, Filterable<AMQExcept } } - public final class DequeuedState extends EntryState { @@ -69,7 +67,6 @@ public interface QueueEntry extends Comparable<QueueEntry>, Filterable<AMQExcept } } - public final class DeletedState extends EntryState { @@ -88,7 +85,6 @@ public interface QueueEntry extends Comparable<QueueEntry>, Filterable<AMQExcept } } - public final class NonSubscriptionAcquiredState extends EntryState { public State getState() @@ -106,7 +102,6 @@ public interface QueueEntry extends Comparable<QueueEntry>, Filterable<AMQExcept _subscription = subscription; } - public State getState() { return State.ACQUIRED; @@ -118,16 +113,12 @@ public interface QueueEntry extends Comparable<QueueEntry>, Filterable<AMQExcept } } - final static EntryState AVAILABLE_STATE = new AvailableState(); final static EntryState DELETED_STATE = new DeletedState(); final static EntryState DEQUEUED_STATE = new DequeuedState(); final static EntryState EXPIRED_STATE = new ExpiredState(); final static EntryState NON_SUBSCRIPTION_ACQUIRED_STATE = new NonSubscriptionAcquiredState(); - - - AMQQueue getQueue(); AMQMessage getMessage(); @@ -141,9 +132,11 @@ public interface QueueEntry extends Comparable<QueueEntry>, Filterable<AMQExcept boolean isAcquired(); boolean acquire(); + boolean acquire(Subscription sub); boolean delete(); + boolean isDeleted(); boolean acquiredBySubscription(); @@ -170,12 +163,21 @@ public interface QueueEntry extends Comparable<QueueEntry>, Filterable<AMQExcept void dequeue(final StoreContext storeContext) throws FailedDequeueException; - void dispose(final StoreContext storeContext) throws MessageCleanupException; - - void discard(StoreContext storeContext) throws FailedDequeueException, MessageCleanupException; + /** + * Message has been ack so dequeueAndDelete it. + * If the message is persistent and this is the last QueueEntry that uses it then the data will be removed + * from the transaciton log + * + * @param storeContext the transactional Context in which to perform the deletion + * + * @throws FailedDequeueException + * @throws MessageCleanupException + */ + void dequeueAndDelete(StoreContext storeContext) throws FailedDequeueException; boolean isQueueDeleted(); void addStateChangeListener(StateChangeListener listener); + boolean removeStateChangeListener(StateChangeListener listener); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java index 3eb1636884..911ed8321b 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java @@ -282,13 +282,12 @@ public class QueueEntryImpl implements QueueEntry } getQueue().dequeue(storeContext, this); - if(_stateChangeListeners != null) + + if (_stateChangeListeners != null) { - notifyStateChange(state.getState() , QueueEntry.State.DEQUEUED); + notifyStateChange(state.getState(), QueueEntry.State.DEQUEUED); } - } - } private void notifyStateChange(final State oldState, final State newState) @@ -299,29 +298,15 @@ public class QueueEntryImpl implements QueueEntry } } - public void dispose(final StoreContext storeContext) throws MessageCleanupException - { - _log.info("QEI Disposing of message:" + getMessage().getMessageId() + ": state=" + _state); - if(delete()) - { - _log.info("QEI delete message:" + getMessage().getMessageId()); - getMessage().decrementReference(storeContext); - } - else - { - _log.info("QEI delete state wrong:" + getMessage().getMessageId()); - } - } - - public void discard(StoreContext storeContext) throws FailedDequeueException, MessageCleanupException + public void dequeueAndDelete(StoreContext storeContext) throws FailedDequeueException { - //if the queue is null then the message is waiting to be acked, but has been removed. + //if the queue is null (i.e. queue.delete()'d) then the message is waiting to be acked, but has already be delete()'d; if (getQueue() != null) { dequeue(storeContext); } - dispose(storeContext); + delete(); } public boolean isQueueDeleted() diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java index a0f21033c7..501e90b4d7 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java @@ -408,8 +408,11 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener if (entry.immediateAndNotDelivered()) { - dequeue(storeContext, entry); - entry.dispose(storeContext); + //We acquire the message here to ensure that the dequeueAndDelete will correctly remove the content + // from the transactionLog. This saves us from having to have a custom dequeueAndDelete that checks + // for the AVAILABLE state of an entry rather than the ACQUIRED state that it currently uses. + entry.acquire(); + entry.dequeueAndDelete(storeContext); } else if (!(entry.isAcquired() || entry.isDeleted())) { @@ -562,6 +565,12 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener } + /** + * Only call from queue Entry + * @param storeContext + * @param entry + * @throws FailedDequeueException + */ public void dequeue(StoreContext storeContext, QueueEntry entry) throws FailedDequeueException { decrementQueueCount(); @@ -578,7 +587,6 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener { _virtualHost.getTransactionLog().dequeueMessage(storeContext, this, msg.getMessageId()); } - //entry.dispose(storeContext); } catch (MessageCleanupException e) @@ -814,11 +822,18 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener } + public void moveMessagesToAnotherQueue(final long fromMessageId, final long toMessageId, String queueName, StoreContext storeContext) { + // The move is a two step process. First the messages are moved in the _transactionLog. + // That is persistent messages are moved queues on disk for recovery and the QueueEntries removed from the + // existing queue. + // This is done as Queue.enqueue() does not write the data to the transactionLog. In normal message delivery + // this is done as the message is recieved. + // So The final step is to enqueue the messages on the new queue. AMQQueue toQueue = getVirtualHost().getQueueRegistry().getQueue(new AMQShortString(queueName)); TransactionLog transactionLog = getVirtualHost().getTransactionLog(); @@ -844,7 +859,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener { transactionLog.beginTran(storeContext); - // Move the messages in on the transaction log. + // Move the messages in the transaction log. for (QueueEntry entry : entries) { AMQMessage message = entry.getMessage(); @@ -853,7 +868,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener { transactionLog.enqueueMessage(storeContext, toQueue, message.getMessageId()); } - // dequeue does not decrement the refence count + // dequeue will remove the messages from the queue entry.dequeue(storeContext); } @@ -882,10 +897,10 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener try { + // Add messages to new queue for (QueueEntry entry : entries) { toQueue.enqueue(storeContext, entry.getMessage()); - } } catch (MessageCleanupException e) @@ -918,7 +933,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener { if (!entry.isDeleted()) { - return entry.getMessage().incrementReference(1); + return true; } } @@ -940,7 +955,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener { AMQMessage message = entry.getMessage(); - if (message.isReferenced() && message.isPersistent() && toQueue.isDurable()) + if (!entry.isDeleted() && message.isPersistent() && toQueue.isDurable()) { transactionLog.enqueueMessage(storeContext, toQueue, message.getMessageId()); } @@ -973,7 +988,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener { for (QueueEntry entry : entries) { - if (entry.getMessage().isReferenced()) + if (!entry.isDeleted()) { toQueue.enqueue(storeContext, entry.getMessage()); } @@ -1008,7 +1023,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener && !node.isDeleted() && node.acquire()) { - node.discard(storeContext); + node.dequeueAndDelete(storeContext); } } @@ -1032,7 +1047,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener QueueEntry node = queueListIterator.getNode(); if (!node.isDeleted() && node.acquire()) { - node.discard(storeContext); + node.dequeueAndDelete(storeContext); noDeletes = false; } @@ -1050,7 +1065,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener QueueEntry node = queueListIterator.getNode(); if (!node.isDeleted() && node.acquire()) { - node.discard(storeContext); + node.dequeueAndDelete(storeContext); count++; } @@ -1315,8 +1330,9 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener { if (node.acquire()) { + // creating a new final store context per message seems wasteful. final StoreContext reapingStoreContext = new StoreContext(); - node.discard(reapingStoreContext); + node.dequeueAndDelete(reapingStoreContext); } } QueueEntry newNode = _entries.next(node); @@ -1431,7 +1447,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener QueueEntry node = queueListIterator.getNode(); if (!node.isDeleted() && node.expired() && node.acquire()) { - node.discard(storeContext); + node.dequeueAndDelete(storeContext); } else { diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/TransientAMQMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/TransientAMQMessage.java index f3d74fb01c..fa4e85a043 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/TransientAMQMessage.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/TransientAMQMessage.java @@ -172,11 +172,6 @@ public class TransientAMQMessage implements AMQMessage _expiration = expiration; } - public boolean isReferenced() - { - return _referenceCount.get() > 0; - } - public Iterator<AMQDataBlock> getBodyFrameIterator(AMQProtocolSession protocolSession, int channel) { return new BodyFrameIterator(protocolSession, channel); @@ -197,76 +192,6 @@ public class TransientAMQMessage implements AMQMessage return _messageId; } - /* Threadsafe. Increment the reference count on the message. */ - public boolean incrementReference(int count) - { - if (_referenceCount.addAndGet(count) <= 1) - { - int newcount = _referenceCount.addAndGet(-count); - _log.debug("Message(" + _messageId + ") Incremented Ref count by (" + count + ") to :" + newcount); - return false; - } - else - { - _log.debug("Message(" + _messageId + ") Incremented Ref count by (" + count + ") but count was <=1(" - + _referenceCount.get() + ")"); - return true; - } - - } - - /** - * Threadsafe. This will decrement the reference count and when it reaches zero will remove the message from the - * message store. - * - * @param storeContext - * - * @throws MessageCleanupException when an attempt was made to remove the message from the message store and that - * failed - */ - public void decrementReference(StoreContext storeContext) throws MessageCleanupException - { - - int count = _referenceCount.decrementAndGet(); - - _log.debug("Message(" + _messageId + ") Decremented Ref count to :" + count); - - // note that the operation of decrementing the reference count and then removing the message does not - // have to be atomic since the ref count starts at 1 and the exchange itself decrements that after - // the message has been passed to all queues. i.e. we are - // not relying on the all the increments having taken place before the delivery manager decrements. - if (count == 0) - { - // set the reference count way below 0 so that we can detect that the message has been deleted - // this is to guard against the message being spontaneously recreated (from the mgmt console) - // by copying from other queues at the same time as it is being removed. - _referenceCount.set(Integer.MIN_VALUE / 2); - - try - { - _log.debug("Reference Count hit 0, removing message"); - // must check if the handle is null since there may be cases where we decide to throw away a message - // and the handle has not yet been constructed - // no need to perform persistent check anymore as TransientAMQM.removeMessage() is a no-op - removeMessage(storeContext); - } - catch (AMQException e) - { - // to maintain consistency, we revert the count - incrementReference(1); - throw new MessageCleanupException(getMessageId(), e); - } - } - else - { - if (count < 0) - { - throw new MessageCleanupException("Reference count for message id " + debugIdentity() - + " has gone below 0."); - } - } - } - /** * Called selectors to determin if the message has already been sent * @@ -435,11 +360,6 @@ public class TransientAMQMessage implements AMQMessage return _arrivalTime; } - public void removeMessage(StoreContext storeContext) throws AMQException - { - //no-op - } - public String toString() { // return "Message[" + debugIdentity() + "]: " + _messageId + "; ref count: " + _referenceCount + "; taken : " + diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java index fe81346c8c..33b3d8608e 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java +++ b/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java @@ -1357,7 +1357,8 @@ public class DerbyMessageStore implements TransactionLog, RoutingTable if(message != null) { - message.incrementReference(1); + //todo must enqueue message to build reference table +// message.incrementReference(1); } else { diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java index aa7b6e2542..cd0f0c1769 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java +++ b/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java @@ -30,24 +30,28 @@ import org.apache.qpid.server.queue.MessageMetaData; import org.apache.qpid.server.configuration.VirtualHostConfiguration; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.virtualhost.VirtualHost; -import org.apache.qpid.server.transactionlog.TransactionLog; +import org.apache.qpid.server.queue.MessageMetaData; import org.apache.qpid.server.routing.RoutingTable; +import org.apache.qpid.server.transactionlog.TransactionLog; +import org.apache.qpid.server.virtualhost.VirtualHost; import java.util.ArrayList; import java.util.Collections; +import java.util.LinkedList; import java.util.List; +import java.util.Map; +import java.util.HashMap; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; -/** A simple message store that stores the messages in a threadsafe structure in memory. +/** + * A simple message store that stores the messages in a threadsafe structure in memory. * * NOTE: Now that we have removed the MessageStore interface and are using a TransactionLog * * This class really should have no storage unless we want to do inMemory Recovery. - * */ public class MemoryMessageStore implements TransactionLog, RoutingTable { @@ -63,6 +67,7 @@ public class MemoryMessageStore implements TransactionLog, RoutingTable private final AtomicLong _messageId = new AtomicLong(1); private AtomicBoolean _closed = new AtomicBoolean(false); + protected final Map<Long, List<AMQQueue>> _messageEnqueueMap = new HashMap<Long, List<AMQQueue>>(); public void configure() { @@ -112,6 +117,7 @@ public class MemoryMessageStore implements TransactionLog, RoutingTable } _metaDataMap.remove(messageId); _contentBodyMap.remove(messageId); + _messageEnqueueMap.remove(messageId); } public void createExchange(Exchange exchange) throws AMQException @@ -134,7 +140,6 @@ public class MemoryMessageStore implements TransactionLog, RoutingTable } - public void createQueue(AMQQueue queue) throws AMQException { // Not requred to do anything @@ -152,12 +157,39 @@ public class MemoryMessageStore implements TransactionLog, RoutingTable public void enqueueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException { - // Not required to do anything + synchronized (_messageEnqueueMap) + { + List<AMQQueue> queues = _messageEnqueueMap.get(messageId); + if (queues == null) + { + queues = new LinkedList<AMQQueue>(); + _messageEnqueueMap.put(messageId, queues); + } + + queues.add(queue); + } } public void dequeueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException { - // Not required to do anything + synchronized (_messageEnqueueMap) + { + List<AMQQueue> queues = _messageEnqueueMap.get(messageId); + if (queues == null || !queues.contains(queue)) + { + throw new RuntimeException("Attempt to dequeue messageID:" + messageId + " from queue:" + queue.getName() + + " but it is not enqueued on that queue."); + } + else + { + queues.remove(queue); + if (queues.isEmpty()) + { + removeMessage(context,messageId); + } + } + } + } public void beginTran(StoreContext context) throws AMQException @@ -238,7 +270,7 @@ public class MemoryMessageStore implements TransactionLog, RoutingTable } private void checkNotClosed() throws MessageStoreClosedException - { + { if (_closed.get()) { throw new MessageStoreClosedException(); diff --git a/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java b/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java index 1c58e644e9..119a4b1692 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java +++ b/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java @@ -144,7 +144,8 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage StoreContext storeContext = getChannel().getStoreContext(); try - { // if we do not need to wait for client acknowledgements + { + // if we do not need to wait for client acknowledgements // we can decrement the reference count immediately. // By doing this _before_ the send we ensure that it @@ -153,7 +154,7 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage // The send may of course still fail, in which case, as // the message is unacked, it will be lost. - entry.dequeue(storeContext); + entry.dequeueAndDelete(storeContext); synchronized (getChannel()) @@ -163,7 +164,6 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage sendToClient(entry, deliveryTag); } - entry.dispose(storeContext); } finally { @@ -316,7 +316,7 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage _autoClose = false; } - + _logger.info(debugIdentity()+" Created subscription:"); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java b/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java index 8e63b95f0d..abfb60c5bf 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java +++ b/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java @@ -92,20 +92,12 @@ public class LocalTransactionalContext implements TransactionalContext public void process() throws AMQException { - _message.incrementReference(1); - try - { QueueEntry entry = _queue.enqueue(getStoreContext(),_message); if(entry.immediateAndNotDelivered()) { getReturnMessages().add(new NoConsumersException(_message)); } - } - finally - { - _message.decrementReference(getStoreContext()); - } } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java b/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java index 145d7f8b13..561f998b98 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java +++ b/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java @@ -123,18 +123,18 @@ public class NonTransactionalContext implements TransactionalContext unacknowledgedMessageMap.size()); unacknowledgedMessageMap.visit(new UnacknowledgedMessageMap.Visitor() { - public boolean callback(final long deliveryTag, QueueEntry message) throws AMQException + public boolean callback(final long deliveryTag, QueueEntry queueEntry) throws AMQException { if (debug) { - _log.debug("Discarding message: " + message.getMessage().getMessageId()); + _log.debug("Discarding message: " + queueEntry.getMessage().getMessageId()); } - if(message.getMessage().isPersistent()) + if(queueEntry.getMessage().isPersistent()) { beginTranIfNecessary(); } - //Message has been ack so discard it. This will dequeue and decrement the reference. - message.discard(_storeContext); + //Message has been ack so dequeueAndDelete it. + queueEntry.dequeueAndDelete(_storeContext); return false; } @@ -157,10 +157,10 @@ public class NonTransactionalContext implements TransactionalContext } else { - QueueEntry msg; - msg = unacknowledgedMessageMap.get(deliveryTag); + QueueEntry queueEntry; + queueEntry = unacknowledgedMessageMap.get(deliveryTag); - if (msg == null) + if (queueEntry == null) { _log.info("Single ack on delivery tag " + deliveryTag + " not known for channel:" + _channel.getChannelId()); @@ -170,15 +170,17 @@ public class NonTransactionalContext implements TransactionalContext if (debug) { - _log.debug("Discarding message: " + msg.getMessage().getMessageId()); + _log.debug("Discarding message: " + queueEntry.getMessage().getMessageId()); } - if(msg.getMessage().isPersistent()) + if(queueEntry.getMessage().isPersistent()) { beginTranIfNecessary(); } - //Message has been ack so discard it. This will dequeue and decrement the reference. - msg.discard(_storeContext); + //Message has been ack so dequeueAndDelete it. + // If the message is persistent and this is the last QueueEntry that uses it then the data will be removed + // from the transaciton log + queueEntry.dequeueAndDelete(_storeContext); unacknowledgedMessageMap.remove(deliveryTag); @@ -186,7 +188,7 @@ public class NonTransactionalContext implements TransactionalContext if (debug) { _log.debug("Received non-multiple ack for messaging with delivery tag " + deliveryTag + " msg id " + - msg.getMessage().getMessageId()); + queueEntry.getMessage().getMessageId()); } } if(_inTran) |
