summaryrefslogtreecommitdiff
path: root/java/broker/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'java/broker/src/main')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java10
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/ExtractResendAndRequeue.java4
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java8
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java21
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java6
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java22
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java9
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java7
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java12
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/PersistentAMQMessage.java7
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java26
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java27
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java44
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/TransientAMQMessage.java80
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java3
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java48
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java8
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java8
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java28
19 files changed, 146 insertions, 232 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index 125518358b..5a01888ccf 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -499,7 +499,7 @@ public class AMQChannel
}
else
{
- unacked.discard(_storeContext);
+ unacked.dequeueAndDelete(_storeContext);
}
}
@@ -555,7 +555,7 @@ public class AMQChannel
_log.warn(System.identityHashCode(this) + " Requested requeue of message(" + unacked.getMessage().debugIdentity()
+ "):" + deliveryTag + " but no queue defined and no DeadLetter queue so DROPPING message.");
- unacked.discard(_storeContext);
+ unacked.dequeueAndDelete(_storeContext);
}
}
else
@@ -712,7 +712,7 @@ public class AMQChannel
{
try
{
- message.discard(_storeContext);
+ message.dequeueAndDelete(_storeContext);
message.setQueueDeleted(true);
}
@@ -831,9 +831,7 @@ public class AMQChannel
{
AMQMessage message = bouncedMessage.getAMQMessage();
_session.getProtocolOutputConverter().writeReturn(message, _channelId, bouncedMessage.getReplyCode().getCode(),
- new AMQShortString(bouncedMessage.getMessage()));
-
- message.decrementReference(_storeContext);
+ new AMQShortString(bouncedMessage.getMessage()));
}
_returnMessages.clear();
diff --git a/java/broker/src/main/java/org/apache/qpid/server/ExtractResendAndRequeue.java b/java/broker/src/main/java/org/apache/qpid/server/ExtractResendAndRequeue.java
index 1723d46ef0..8d41cc58d2 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/ExtractResendAndRequeue.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/ExtractResendAndRequeue.java
@@ -82,13 +82,13 @@ public class ExtractResendAndRequeue implements UnacknowledgedMessageMap.Visitor
}
else
{
- queueEntry.discard(_storeContext);
+ queueEntry.dequeueAndDelete(_storeContext);
_log.info("No DeadLetter Queue and requeue not requested so dropping message:" + queueEntry);
}
}
else
{
- queueEntry.discard(_storeContext);
+ queueEntry.dequeueAndDelete(_storeContext);
_log.warn("Message.queue is null and no DeadLetter Queue so dropping message:" + queueEntry);
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java b/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java
index 415f1fe8be..a81b2cc2db 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java
@@ -56,14 +56,10 @@ public abstract class RequiredDeliveryException extends AMQException
public void setMessage(final AMQMessage payload)
{
-
- // Increment the reference as this message is in the routing phase
- // and so will have the ref decremented as routing fails.
// we need to keep this message around so we can return it in the
- // handler. So increment here.
- payload.incrementReference(1);
+ // handler.
+ // Messages are all kept in memory now. Only queues can push messages out of memory.
_amqMessage = payload;
-
}
public AMQMessage getAMQMessage()
diff --git a/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java b/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java
index 0f40e00624..918fcd8407 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java
@@ -20,7 +20,6 @@
*/
package org.apache.qpid.server.ack;
-import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.HashMap;
@@ -116,22 +115,20 @@ public class TxAck implements TxnOp
//make persistent changes, i.e. dequeue and decrementReference
for (QueueEntry msg : _unacked.values())
{
- //Message has been ack so discard it. This will dequeue and decrement the reference.
- msg.discard(storeContext);
-
+ //Message has been ack so dequeueAndDelete it.
+ // If the message is persistent and this is the last QueueEntry that uses it then the data will be removed
+ // from the transaciton log
+ msg.dequeueAndDelete(storeContext);
}
}
public void undoPrepare()
{
- //decrementReference is annoyingly untransactional (due to
- //in memory counter) so if we failed in prepare for full
- //txn, this op will have to compensate by fixing the count
- //in memory (persistent changes will be rolled back by store)
- for (QueueEntry msg : _unacked.values())
- {
- msg.getMessage().incrementReference(1);
- }
+ //As this is transaction the above dequeueAndDelete will only request the message be dequeue from the
+ // transactionLog. Only when the transaction succesfully completes will it perform any
+ // update of the internal transactionLog reference counting and any resulting message data deletion.
+ // The success or failure of the data deletion is not important to this transaction only that the ack has been
+ // successfully recorded.
}
public void commit(StoreContext storeContext)
diff --git a/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java b/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java
index efdadd4922..ac3b0b5e49 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java
@@ -174,8 +174,10 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap
" When deliveryTag is:" + deliveryTag + "ES:" + _map.entrySet().toString());
}
- //Message has been ack so discard it. This will dequeue and decrement the reference.
- unacked.getValue().discard(storeContext);
+ //Message has been ack so dequeueAndDelete it.
+ // If the message is persistent and this is the last QueueEntry that uses it then the data will be removed
+ // from the transaciton log
+ unacked.getValue().dequeueAndDelete(storeContext);
it.remove();
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java
index f3cab10ed7..bd70cd7776 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java
@@ -65,38 +65,38 @@ public class BasicRejectMethodHandler implements StateAwareMethodListener<BasicR
long deliveryTag = body.getDeliveryTag();
- QueueEntry message = channel.getUnacknowledgedMessageMap().get(deliveryTag);
+ QueueEntry queueEntry = channel.getUnacknowledgedMessageMap().get(deliveryTag);
- if (message == null)
+ if (queueEntry == null)
{
_logger.warn("Dropping reject request as message is null for tag:" + deliveryTag);
// throw evt.getMethod().getChannelException(AMQConstant.NOT_FOUND, "Delivery Tag(" + deliveryTag + ")not known");
}
else
{
- if (message.isQueueDeleted())
+ if (queueEntry.isQueueDeleted())
{
_logger.warn("Message's Queue as already been purged, unable to Reject. " +
"Dropping message should use Dead Letter Queue");
- message = channel.getUnacknowledgedMessageMap().remove(deliveryTag);
- if(message != null)
+ queueEntry = channel.getUnacknowledgedMessageMap().remove(deliveryTag);
+ if(queueEntry != null)
{
- message.discard(channel.getStoreContext());
+ queueEntry.dequeueAndDelete(channel.getStoreContext());
}
//sendtoDeadLetterQueue(msg)
return;
}
- if (!message.getMessage().isReferenced())
+ if (queueEntry.isDeleted())
{
- _logger.warn("Message as already been purged, unable to Reject.");
+ _logger.warn("QueueEntry as already been deleted, unable to Reject.");
return;
}
if (_logger.isDebugEnabled())
{
- _logger.debug("Rejecting: DT:" + deliveryTag + "-" + message.getMessage().debugIdentity() +
+ _logger.debug("Rejecting: DT:" + deliveryTag + "-" + queueEntry.getMessage().debugIdentity() +
": Requeue:" + body.getRequeue() +
//": Resend:" + evt.getMethod().resend +
" on channel:" + channel.debugIdentity());
@@ -105,7 +105,7 @@ public class BasicRejectMethodHandler implements StateAwareMethodListener<BasicR
// If we haven't requested message to be resent to this consumer then reject it from ever getting it.
//if (!evt.getMethod().resend)
{
- message.reject();
+ queueEntry.reject();
}
if (body.getRequeue())
@@ -115,7 +115,7 @@ public class BasicRejectMethodHandler implements StateAwareMethodListener<BasicR
else
{
_logger.warn("Dropping message as requeue not required and there is no dead letter queue");
- message = channel.getUnacknowledgedMessageMap().remove(deliveryTag);
+ queueEntry = channel.getUnacknowledgedMessageMap().remove(deliveryTag);
//sendtoDeadLetterQueue(AMQMessage message)
// message.queue = channel.getDefaultDeadLetterQueue();
// channel.requeue(deliveryTag);
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
index 1f56b2ccd2..e96d2ba874 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
@@ -114,17 +114,8 @@ public interface AMQMessage
throws AMQException;
- void removeMessage(StoreContext storeContext) throws AMQException;
String toString();
String debugIdentity();
-
- // Reference counting methods
-
- void decrementReference(StoreContext storeContext) throws MessageCleanupException;
-
- boolean incrementReference(int queueCount);
-
- boolean isReferenced();
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
index 0838b71c54..9fadbb0cdc 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
@@ -81,25 +81,18 @@ public interface AMQQueue extends Managable, Comparable<AMQQueue>
boolean isDeleted();
-
int delete() throws AMQException;
-
QueueEntry enqueue(StoreContext storeContext, AMQMessage message) throws AMQException;
void requeue(StoreContext storeContext, QueueEntry entry) throws AMQException;
void dequeue(StoreContext storeContext, QueueEntry entry) throws FailedDequeueException;
-
-
boolean resend(final QueueEntry entry, final Subscription subscription) throws AMQException;
-
-
void addQueueDeleteTask(final Task task);
-
List<QueueEntry> getMessagesOnTheQueue();
List<QueueEntry> getMessagesOnTheQueue(long fromMessageId, long toMessageId);
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
index 5d4322c4fc..5eafd281c0 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
@@ -156,8 +156,7 @@ public class IncomingMessage implements Filterable<RuntimeException>
_logger.debug("Delivering message " + getMessageId() + " to " + _destinationQueues);
}
- try
- {
+
// first we allow the handle to know that the message has been fully received. This is useful if it is
// maintaining any calculated values based on content chunks
_message.setPublishAndContentHeaderBody(_txnContext.getStoreContext(), _messagePublishInfo, getContentHeaderBody());
@@ -196,7 +195,6 @@ public class IncomingMessage implements Filterable<RuntimeException>
{
int offset;
final int queueCount = _destinationQueues.size();
- _message.incrementReference(queueCount);
if(queueCount == 1)
{
offset = 0;
@@ -222,12 +220,8 @@ public class IncomingMessage implements Filterable<RuntimeException>
}
return _message;
- }
- finally
- {
- // Remove refence for routing process . Reference count should now == delivered queue count
- if(_message != null) _message.decrementReference(_txnContext.getStoreContext());
- }
+
+
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/PersistentAMQMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/PersistentAMQMessage.java
index ec48a2afb0..92c10b0347 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/PersistentAMQMessage.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/PersistentAMQMessage.java
@@ -58,13 +58,6 @@ public class PersistentAMQMessage extends TransientAMQMessage
}
@Override
- public void removeMessage(StoreContext storeContext) throws AMQException
- {
- _log.info("PAMQM : removing message:" + _messageId);
- _transactionLog.removeMessage(storeContext, _messageId);
- }
-
- @Override
public boolean isPersistent()
{
return true;
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
index 0df976a620..09600b9d28 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
@@ -49,7 +49,6 @@ public interface QueueEntry extends Comparable<QueueEntry>, Filterable<AMQExcept
public abstract State getState();
}
-
public final class AvailableState extends EntryState
{
@@ -59,7 +58,6 @@ public interface QueueEntry extends Comparable<QueueEntry>, Filterable<AMQExcept
}
}
-
public final class DequeuedState extends EntryState
{
@@ -69,7 +67,6 @@ public interface QueueEntry extends Comparable<QueueEntry>, Filterable<AMQExcept
}
}
-
public final class DeletedState extends EntryState
{
@@ -88,7 +85,6 @@ public interface QueueEntry extends Comparable<QueueEntry>, Filterable<AMQExcept
}
}
-
public final class NonSubscriptionAcquiredState extends EntryState
{
public State getState()
@@ -106,7 +102,6 @@ public interface QueueEntry extends Comparable<QueueEntry>, Filterable<AMQExcept
_subscription = subscription;
}
-
public State getState()
{
return State.ACQUIRED;
@@ -118,16 +113,12 @@ public interface QueueEntry extends Comparable<QueueEntry>, Filterable<AMQExcept
}
}
-
final static EntryState AVAILABLE_STATE = new AvailableState();
final static EntryState DELETED_STATE = new DeletedState();
final static EntryState DEQUEUED_STATE = new DequeuedState();
final static EntryState EXPIRED_STATE = new ExpiredState();
final static EntryState NON_SUBSCRIPTION_ACQUIRED_STATE = new NonSubscriptionAcquiredState();
-
-
-
AMQQueue getQueue();
AMQMessage getMessage();
@@ -141,9 +132,11 @@ public interface QueueEntry extends Comparable<QueueEntry>, Filterable<AMQExcept
boolean isAcquired();
boolean acquire();
+
boolean acquire(Subscription sub);
boolean delete();
+
boolean isDeleted();
boolean acquiredBySubscription();
@@ -170,12 +163,21 @@ public interface QueueEntry extends Comparable<QueueEntry>, Filterable<AMQExcept
void dequeue(final StoreContext storeContext) throws FailedDequeueException;
- void dispose(final StoreContext storeContext) throws MessageCleanupException;
-
- void discard(StoreContext storeContext) throws FailedDequeueException, MessageCleanupException;
+ /**
+ * Message has been ack so dequeueAndDelete it.
+ * If the message is persistent and this is the last QueueEntry that uses it then the data will be removed
+ * from the transaciton log
+ *
+ * @param storeContext the transactional Context in which to perform the deletion
+ *
+ * @throws FailedDequeueException
+ * @throws MessageCleanupException
+ */
+ void dequeueAndDelete(StoreContext storeContext) throws FailedDequeueException;
boolean isQueueDeleted();
void addStateChangeListener(StateChangeListener listener);
+
boolean removeStateChangeListener(StateChangeListener listener);
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
index 3eb1636884..911ed8321b 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
@@ -282,13 +282,12 @@ public class QueueEntryImpl implements QueueEntry
}
getQueue().dequeue(storeContext, this);
- if(_stateChangeListeners != null)
+
+ if (_stateChangeListeners != null)
{
- notifyStateChange(state.getState() , QueueEntry.State.DEQUEUED);
+ notifyStateChange(state.getState(), QueueEntry.State.DEQUEUED);
}
-
}
-
}
private void notifyStateChange(final State oldState, final State newState)
@@ -299,29 +298,15 @@ public class QueueEntryImpl implements QueueEntry
}
}
- public void dispose(final StoreContext storeContext) throws MessageCleanupException
- {
- _log.info("QEI Disposing of message:" + getMessage().getMessageId() + ": state=" + _state);
- if(delete())
- {
- _log.info("QEI delete message:" + getMessage().getMessageId());
- getMessage().decrementReference(storeContext);
- }
- else
- {
- _log.info("QEI delete state wrong:" + getMessage().getMessageId());
- }
- }
-
- public void discard(StoreContext storeContext) throws FailedDequeueException, MessageCleanupException
+ public void dequeueAndDelete(StoreContext storeContext) throws FailedDequeueException
{
- //if the queue is null then the message is waiting to be acked, but has been removed.
+ //if the queue is null (i.e. queue.delete()'d) then the message is waiting to be acked, but has already be delete()'d;
if (getQueue() != null)
{
dequeue(storeContext);
}
- dispose(storeContext);
+ delete();
}
public boolean isQueueDeleted()
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
index a0f21033c7..501e90b4d7 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
@@ -408,8 +408,11 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
if (entry.immediateAndNotDelivered())
{
- dequeue(storeContext, entry);
- entry.dispose(storeContext);
+ //We acquire the message here to ensure that the dequeueAndDelete will correctly remove the content
+ // from the transactionLog. This saves us from having to have a custom dequeueAndDelete that checks
+ // for the AVAILABLE state of an entry rather than the ACQUIRED state that it currently uses.
+ entry.acquire();
+ entry.dequeueAndDelete(storeContext);
}
else if (!(entry.isAcquired() || entry.isDeleted()))
{
@@ -562,6 +565,12 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
}
+ /**
+ * Only call from queue Entry
+ * @param storeContext
+ * @param entry
+ * @throws FailedDequeueException
+ */
public void dequeue(StoreContext storeContext, QueueEntry entry) throws FailedDequeueException
{
decrementQueueCount();
@@ -578,7 +587,6 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
{
_virtualHost.getTransactionLog().dequeueMessage(storeContext, this, msg.getMessageId());
}
- //entry.dispose(storeContext);
}
catch (MessageCleanupException e)
@@ -814,11 +822,18 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
}
+
public void moveMessagesToAnotherQueue(final long fromMessageId,
final long toMessageId,
String queueName,
StoreContext storeContext)
{
+ // The move is a two step process. First the messages are moved in the _transactionLog.
+ // That is persistent messages are moved queues on disk for recovery and the QueueEntries removed from the
+ // existing queue.
+ // This is done as Queue.enqueue() does not write the data to the transactionLog. In normal message delivery
+ // this is done as the message is recieved.
+ // So The final step is to enqueue the messages on the new queue.
AMQQueue toQueue = getVirtualHost().getQueueRegistry().getQueue(new AMQShortString(queueName));
TransactionLog transactionLog = getVirtualHost().getTransactionLog();
@@ -844,7 +859,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
{
transactionLog.beginTran(storeContext);
- // Move the messages in on the transaction log.
+ // Move the messages in the transaction log.
for (QueueEntry entry : entries)
{
AMQMessage message = entry.getMessage();
@@ -853,7 +868,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
{
transactionLog.enqueueMessage(storeContext, toQueue, message.getMessageId());
}
- // dequeue does not decrement the refence count
+ // dequeue will remove the messages from the queue
entry.dequeue(storeContext);
}
@@ -882,10 +897,10 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
try
{
+ // Add messages to new queue
for (QueueEntry entry : entries)
{
toQueue.enqueue(storeContext, entry.getMessage());
-
}
}
catch (MessageCleanupException e)
@@ -918,7 +933,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
{
if (!entry.isDeleted())
{
- return entry.getMessage().incrementReference(1);
+ return true;
}
}
@@ -940,7 +955,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
{
AMQMessage message = entry.getMessage();
- if (message.isReferenced() && message.isPersistent() && toQueue.isDurable())
+ if (!entry.isDeleted() && message.isPersistent() && toQueue.isDurable())
{
transactionLog.enqueueMessage(storeContext, toQueue, message.getMessageId());
}
@@ -973,7 +988,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
{
for (QueueEntry entry : entries)
{
- if (entry.getMessage().isReferenced())
+ if (!entry.isDeleted())
{
toQueue.enqueue(storeContext, entry.getMessage());
}
@@ -1008,7 +1023,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
&& !node.isDeleted()
&& node.acquire())
{
- node.discard(storeContext);
+ node.dequeueAndDelete(storeContext);
}
}
@@ -1032,7 +1047,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
QueueEntry node = queueListIterator.getNode();
if (!node.isDeleted() && node.acquire())
{
- node.discard(storeContext);
+ node.dequeueAndDelete(storeContext);
noDeletes = false;
}
@@ -1050,7 +1065,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
QueueEntry node = queueListIterator.getNode();
if (!node.isDeleted() && node.acquire())
{
- node.discard(storeContext);
+ node.dequeueAndDelete(storeContext);
count++;
}
@@ -1315,8 +1330,9 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
{
if (node.acquire())
{
+ // creating a new final store context per message seems wasteful.
final StoreContext reapingStoreContext = new StoreContext();
- node.discard(reapingStoreContext);
+ node.dequeueAndDelete(reapingStoreContext);
}
}
QueueEntry newNode = _entries.next(node);
@@ -1431,7 +1447,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
QueueEntry node = queueListIterator.getNode();
if (!node.isDeleted() && node.expired() && node.acquire())
{
- node.discard(storeContext);
+ node.dequeueAndDelete(storeContext);
}
else
{
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/TransientAMQMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/TransientAMQMessage.java
index f3d74fb01c..fa4e85a043 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/TransientAMQMessage.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/TransientAMQMessage.java
@@ -172,11 +172,6 @@ public class TransientAMQMessage implements AMQMessage
_expiration = expiration;
}
- public boolean isReferenced()
- {
- return _referenceCount.get() > 0;
- }
-
public Iterator<AMQDataBlock> getBodyFrameIterator(AMQProtocolSession protocolSession, int channel)
{
return new BodyFrameIterator(protocolSession, channel);
@@ -197,76 +192,6 @@ public class TransientAMQMessage implements AMQMessage
return _messageId;
}
- /* Threadsafe. Increment the reference count on the message. */
- public boolean incrementReference(int count)
- {
- if (_referenceCount.addAndGet(count) <= 1)
- {
- int newcount = _referenceCount.addAndGet(-count);
- _log.debug("Message(" + _messageId + ") Incremented Ref count by (" + count + ") to :" + newcount);
- return false;
- }
- else
- {
- _log.debug("Message(" + _messageId + ") Incremented Ref count by (" + count + ") but count was <=1("
- + _referenceCount.get() + ")");
- return true;
- }
-
- }
-
- /**
- * Threadsafe. This will decrement the reference count and when it reaches zero will remove the message from the
- * message store.
- *
- * @param storeContext
- *
- * @throws MessageCleanupException when an attempt was made to remove the message from the message store and that
- * failed
- */
- public void decrementReference(StoreContext storeContext) throws MessageCleanupException
- {
-
- int count = _referenceCount.decrementAndGet();
-
- _log.debug("Message(" + _messageId + ") Decremented Ref count to :" + count);
-
- // note that the operation of decrementing the reference count and then removing the message does not
- // have to be atomic since the ref count starts at 1 and the exchange itself decrements that after
- // the message has been passed to all queues. i.e. we are
- // not relying on the all the increments having taken place before the delivery manager decrements.
- if (count == 0)
- {
- // set the reference count way below 0 so that we can detect that the message has been deleted
- // this is to guard against the message being spontaneously recreated (from the mgmt console)
- // by copying from other queues at the same time as it is being removed.
- _referenceCount.set(Integer.MIN_VALUE / 2);
-
- try
- {
- _log.debug("Reference Count hit 0, removing message");
- // must check if the handle is null since there may be cases where we decide to throw away a message
- // and the handle has not yet been constructed
- // no need to perform persistent check anymore as TransientAMQM.removeMessage() is a no-op
- removeMessage(storeContext);
- }
- catch (AMQException e)
- {
- // to maintain consistency, we revert the count
- incrementReference(1);
- throw new MessageCleanupException(getMessageId(), e);
- }
- }
- else
- {
- if (count < 0)
- {
- throw new MessageCleanupException("Reference count for message id " + debugIdentity()
- + " has gone below 0.");
- }
- }
- }
-
/**
* Called selectors to determin if the message has already been sent
*
@@ -435,11 +360,6 @@ public class TransientAMQMessage implements AMQMessage
return _arrivalTime;
}
- public void removeMessage(StoreContext storeContext) throws AMQException
- {
- //no-op
- }
-
public String toString()
{
// return "Message[" + debugIdentity() + "]: " + _messageId + "; ref count: " + _referenceCount + "; taken : " +
diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java
index fe81346c8c..33b3d8608e 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java
@@ -1357,7 +1357,8 @@ public class DerbyMessageStore implements TransactionLog, RoutingTable
if(message != null)
{
- message.incrementReference(1);
+ //todo must enqueue message to build reference table
+// message.incrementReference(1);
}
else
{
diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
index aa7b6e2542..cd0f0c1769 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
@@ -30,24 +30,28 @@ import org.apache.qpid.server.queue.MessageMetaData;
import org.apache.qpid.server.configuration.VirtualHostConfiguration;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.transactionlog.TransactionLog;
+import org.apache.qpid.server.queue.MessageMetaData;
import org.apache.qpid.server.routing.RoutingTable;
+import org.apache.qpid.server.transactionlog.TransactionLog;
+import org.apache.qpid.server.virtualhost.VirtualHost;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.LinkedList;
import java.util.List;
+import java.util.Map;
+import java.util.HashMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
-/** A simple message store that stores the messages in a threadsafe structure in memory.
+/**
+ * A simple message store that stores the messages in a threadsafe structure in memory.
*
* NOTE: Now that we have removed the MessageStore interface and are using a TransactionLog
*
* This class really should have no storage unless we want to do inMemory Recovery.
- *
*/
public class MemoryMessageStore implements TransactionLog, RoutingTable
{
@@ -63,6 +67,7 @@ public class MemoryMessageStore implements TransactionLog, RoutingTable
private final AtomicLong _messageId = new AtomicLong(1);
private AtomicBoolean _closed = new AtomicBoolean(false);
+ protected final Map<Long, List<AMQQueue>> _messageEnqueueMap = new HashMap<Long, List<AMQQueue>>();
public void configure()
{
@@ -112,6 +117,7 @@ public class MemoryMessageStore implements TransactionLog, RoutingTable
}
_metaDataMap.remove(messageId);
_contentBodyMap.remove(messageId);
+ _messageEnqueueMap.remove(messageId);
}
public void createExchange(Exchange exchange) throws AMQException
@@ -134,7 +140,6 @@ public class MemoryMessageStore implements TransactionLog, RoutingTable
}
-
public void createQueue(AMQQueue queue) throws AMQException
{
// Not requred to do anything
@@ -152,12 +157,39 @@ public class MemoryMessageStore implements TransactionLog, RoutingTable
public void enqueueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException
{
- // Not required to do anything
+ synchronized (_messageEnqueueMap)
+ {
+ List<AMQQueue> queues = _messageEnqueueMap.get(messageId);
+ if (queues == null)
+ {
+ queues = new LinkedList<AMQQueue>();
+ _messageEnqueueMap.put(messageId, queues);
+ }
+
+ queues.add(queue);
+ }
}
public void dequeueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException
{
- // Not required to do anything
+ synchronized (_messageEnqueueMap)
+ {
+ List<AMQQueue> queues = _messageEnqueueMap.get(messageId);
+ if (queues == null || !queues.contains(queue))
+ {
+ throw new RuntimeException("Attempt to dequeue messageID:" + messageId + " from queue:" + queue.getName()
+ + " but it is not enqueued on that queue.");
+ }
+ else
+ {
+ queues.remove(queue);
+ if (queues.isEmpty())
+ {
+ removeMessage(context,messageId);
+ }
+ }
+ }
+
}
public void beginTran(StoreContext context) throws AMQException
@@ -238,7 +270,7 @@ public class MemoryMessageStore implements TransactionLog, RoutingTable
}
private void checkNotClosed() throws MessageStoreClosedException
- {
+ {
if (_closed.get())
{
throw new MessageStoreClosedException();
diff --git a/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java b/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
index 1c58e644e9..119a4b1692 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
@@ -144,7 +144,8 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage
StoreContext storeContext = getChannel().getStoreContext();
try
- { // if we do not need to wait for client acknowledgements
+ {
+ // if we do not need to wait for client acknowledgements
// we can decrement the reference count immediately.
// By doing this _before_ the send we ensure that it
@@ -153,7 +154,7 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage
// The send may of course still fail, in which case, as
// the message is unacked, it will be lost.
- entry.dequeue(storeContext);
+ entry.dequeueAndDelete(storeContext);
synchronized (getChannel())
@@ -163,7 +164,6 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage
sendToClient(entry, deliveryTag);
}
- entry.dispose(storeContext);
}
finally
{
@@ -316,7 +316,7 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage
_autoClose = false;
}
-
+ _logger.info(debugIdentity()+" Created subscription:");
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java b/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java
index 8e63b95f0d..abfb60c5bf 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java
@@ -92,20 +92,12 @@ public class LocalTransactionalContext implements TransactionalContext
public void process() throws AMQException
{
- _message.incrementReference(1);
- try
- {
QueueEntry entry = _queue.enqueue(getStoreContext(),_message);
if(entry.immediateAndNotDelivered())
{
getReturnMessages().add(new NoConsumersException(_message));
}
- }
- finally
- {
- _message.decrementReference(getStoreContext());
- }
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java b/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
index 145d7f8b13..561f998b98 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
@@ -123,18 +123,18 @@ public class NonTransactionalContext implements TransactionalContext
unacknowledgedMessageMap.size());
unacknowledgedMessageMap.visit(new UnacknowledgedMessageMap.Visitor()
{
- public boolean callback(final long deliveryTag, QueueEntry message) throws AMQException
+ public boolean callback(final long deliveryTag, QueueEntry queueEntry) throws AMQException
{
if (debug)
{
- _log.debug("Discarding message: " + message.getMessage().getMessageId());
+ _log.debug("Discarding message: " + queueEntry.getMessage().getMessageId());
}
- if(message.getMessage().isPersistent())
+ if(queueEntry.getMessage().isPersistent())
{
beginTranIfNecessary();
}
- //Message has been ack so discard it. This will dequeue and decrement the reference.
- message.discard(_storeContext);
+ //Message has been ack so dequeueAndDelete it.
+ queueEntry.dequeueAndDelete(_storeContext);
return false;
}
@@ -157,10 +157,10 @@ public class NonTransactionalContext implements TransactionalContext
}
else
{
- QueueEntry msg;
- msg = unacknowledgedMessageMap.get(deliveryTag);
+ QueueEntry queueEntry;
+ queueEntry = unacknowledgedMessageMap.get(deliveryTag);
- if (msg == null)
+ if (queueEntry == null)
{
_log.info("Single ack on delivery tag " + deliveryTag + " not known for channel:" +
_channel.getChannelId());
@@ -170,15 +170,17 @@ public class NonTransactionalContext implements TransactionalContext
if (debug)
{
- _log.debug("Discarding message: " + msg.getMessage().getMessageId());
+ _log.debug("Discarding message: " + queueEntry.getMessage().getMessageId());
}
- if(msg.getMessage().isPersistent())
+ if(queueEntry.getMessage().isPersistent())
{
beginTranIfNecessary();
}
- //Message has been ack so discard it. This will dequeue and decrement the reference.
- msg.discard(_storeContext);
+ //Message has been ack so dequeueAndDelete it.
+ // If the message is persistent and this is the last QueueEntry that uses it then the data will be removed
+ // from the transaciton log
+ queueEntry.dequeueAndDelete(_storeContext);
unacknowledgedMessageMap.remove(deliveryTag);
@@ -186,7 +188,7 @@ public class NonTransactionalContext implements TransactionalContext
if (debug)
{
_log.debug("Received non-multiple ack for messaging with delivery tag " + deliveryTag + " msg id " +
- msg.getMessage().getMessageId());
+ queueEntry.getMessage().getMessageId());
}
}
if(_inTran)