summaryrefslogtreecommitdiff
path: root/java/broker/src
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2007-02-23 10:20:44 +0000
committerMartin Ritchie <ritchiem@apache.org>2007-02-23 10:20:44 +0000
commitde248153d311b1e0211dfe3230afcb306f3c0192 (patch)
tree30412df8d5fd1d3ef076fba0903301b25f8a7518 /java/broker/src
parentf74e4dc27d1655760d0213fd60cc75c272c26f00 (diff)
downloadqpid-python-de248153d311b1e0211dfe3230afcb306f3c0192.tar.gz
QPID-346 Message loss after rollback
QPID-348 Problems of prefetching messages QPID-355 Closing a consumer does not ensure messages delivery will stop for that subscription BROKER AMQChannel - updated requeue to either resend via the Delivery Manager not directly via msg.writedeliver. BasicRejectMethodHandler - initial place holder. TxRollbackHandler - Added comment AMQMessage - added ability to record who has taken the message so that it can be resent to that subscriber on resend/requeue. AMQQueue - added the queue reference to the Subscription creation ConcurrentSelectorDeliveryManager - Added methods to correctly monitor the size of queue messages. Including messages on the resend queue of a Subscriber. Additional locking to ensure that messages are not sent to the subscriber after Closure. QPID-355 DeliveryManager - adjusted deliver call to allow delivery to the head of the queue. Subscription - changes to allow selction of queue(resend or predelivery) methods to add to resend and getSendLock to ensure that sending to the Subscription is allowed. SubscriptionFactory - changes to allow the AMQQueue to be passed to the Subscription. SubscriptionImpl - implementation of the interfaces. Local storage of messages to be resent and requeuing of the messages during closure. SubscriptionSet - changes to retrieve the actual stored Subscription when performing removeSubscriber. So we have access to the the resend queue. AMQStateManager - Added BasicRejectMethodHandler TransactionalContext - Added option to deliver the messages to the front of the queue. LocalTransactionalContext - cleared the _postComitDeliveryList on rollback. Added option to deliver the messages to the front of the queue. NonTransactionalContext - Added option to deliver the messages to the front of the queue. DeliverMessageOperation.java DELELTED AS NOT USED. CLIENT AMQSession - added ability to get the pervious state of the dispatcher when settting Stopped, fixed the channel suspension problems on broker so uncommented clean up code in rollback and recover. BasicMessageConsumer - updated the rollback so that it sends reject messages to server. AbstractJMSMessage - whitespace + added extra message properties to the toString() AMQProtocolHandler - whitespace + extra debug output TransactedTest - updated expect to prevent NPEs also added extra logging to help understand what is going on. CLUSTER ClusteredQueue - AMQQueue changes for message deliveryFirst. RemoteSubscriptionImpl - Implementation of Subscription SYSTESTS AbstractHeadersExchangeTestBase - AMQQueue changes for message deliveryFirst. AMQQueueMBeanTest - changes for message deliveryFirst. ConcurrencyTest - changes for message deliveryFirst. DeliveryManagerTest - changes for message deliveryFirst. SubscriptionTestHelper - Implementation of Subscription WhiteSpace only UnacknowledgedMessageMapImpl.java git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@510897 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker/src')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java272
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java1
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java68
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java1
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java165
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java143
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java322
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java36
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java12
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionFactory.java8
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java232
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java70
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java5
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/txn/DeliverMessageOperation.java74
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java15
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java20
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/txn/TransactionalContext.java2
17 files changed, 944 insertions, 502 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index 7271bd6e43..7ceb3a7eef 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -46,6 +46,7 @@ import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.MessageHandleFactory;
+import org.apache.qpid.server.queue.Subscription;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.txn.LocalTransactionalContext;
@@ -74,28 +75,20 @@ public class AMQChannel
*/
private AtomicLong _deliveryTag = new AtomicLong(0);
- /**
- * A channel has a default queue (the last declared) that is used when no queue name is
- * explictily set
- */
+ /** A channel has a default queue (the last declared) that is used when no queue name is explictily set */
private AMQQueue _defaultQueue;
- /**
- * This tag is unique per subscription to a queue. The server returns this in response to a
- * basic.consume request.
- */
+ /** This tag is unique per subscription to a queue. The server returns this in response to a basic.consume request. */
private int _consumerTag;
/**
- * The current message - which may be partial in the sense that not all frames have been received yet -
- * which has been received by this channel. As the frames are received the message gets updated and once all
- * frames have been received the message can then be routed.
+ * The current message - which may be partial in the sense that not all frames have been received yet - which has
+ * been received by this channel. As the frames are received the message gets updated and once all frames have been
+ * received the message can then be routed.
*/
private AMQMessage _currentMessage;
- /**
- * Maps from consumer tag to queue instance. Allows us to unsubscribe from a queue.
- */
+ /** Maps from consumer tag to queue instance. Allows us to unsubscribe from a queue. */
private final Map<AMQShortString, AMQQueue> _consumerTag2QueueMap = new HashMap<AMQShortString, AMQQueue>();
private final MessageStore _messageStore;
@@ -109,8 +102,8 @@ public class AMQChannel
private TransactionalContext _txnContext;
/**
- * A context used by the message store enabling it to track context for a given channel even across
- * thread boundaries
+ * A context used by the message store enabling it to track context for a given channel even across thread
+ * boundaries
*/
private final StoreContext _storeContext;
@@ -123,7 +116,6 @@ public class AMQChannel
private final AMQProtocolSession _session;
-
public AMQChannel(AMQProtocolSession session, int channelId, MessageStore messageStore, MessageRouter exchanges)
throws AMQException
{
@@ -138,9 +130,7 @@ public class AMQChannel
_txnContext = new NonTransactionalContext(_messageStore, _storeContext, this, _returnMessages, _browsedAcks);
}
- /**
- * Sets this channel to be part of a local transaction
- */
+ /** Sets this channel to be part of a local transaction */
public void setLocalTransactional()
{
_txnContext = new LocalTransactionalContext(_messageStore, _storeContext, _returnMessages);
@@ -293,17 +283,17 @@ public class AMQChannel
}
/**
- * Subscribe to a queue. We register all subscriptions in the channel so that
- * if the channel is closed we can clean up all subscriptions, even if the
- * client does not explicitly unsubscribe from all queues.
+ * Subscribe to a queue. We register all subscriptions in the channel so that if the channel is closed we can clean
+ * up all subscriptions, even if the client does not explicitly unsubscribe from all queues.
*
- * @param tag the tag chosen by the client (if null, server will generate one)
- * @param queue the queue to subscribe to
- * @param session the protocol session of the subscriber
+ * @param tag the tag chosen by the client (if null, server will generate one)
+ * @param queue the queue to subscribe to
+ * @param session the protocol session of the subscriber
* @param noLocal
* @param exclusive
- * @return the consumer tag. This is returned to the subscriber and used in
- * subsequent unsubscribe requests
+ *
+ * @return the consumer tag. This is returned to the subscriber and used in subsequent unsubscribe requests
+ *
* @throws ConsumerTagNotUniqueException if the tag is not unique
* @throws AMQException if something goes wrong
*/
@@ -335,7 +325,7 @@ public class AMQChannel
}
/**
- * Called from the protocol session to close this channel and clean up.
+ * Called from the protocol session to close this channel and clean up. T
*
* @throws AMQException if there is an error during closure
*/
@@ -344,8 +334,6 @@ public class AMQChannel
_txnContext.rollback();
unsubscribeAllConsumers(session);
requeue();
- _txnContext.commit();
-
}
private void unsubscribeAllConsumers(AMQProtocolSession session) throws AMQException
@@ -362,8 +350,8 @@ public class AMQChannel
* Add a message to the channel-based list of unacknowledged messages
*
* @param message the message that was delivered
- * @param deliveryTag the delivery tag used when delivering the message (see protocol spec for description of
- * the delivery tag)
+ * @param deliveryTag the delivery tag used when delivering the message (see protocol spec for description of the
+ * delivery tag)
* @param queue the queue from which the message was delivered
*/
public void addUnacknowledgedMessage(AMQMessage message, long deliveryTag, AMQShortString consumerTag, AMQQueue queue)
@@ -376,8 +364,8 @@ public class AMQChannel
}
/**
- * Called to attempt re-enqueue all outstanding unacknowledged messages on the channel.
- * May result in delivery to this same channel or to other subscribers.
+ * Called to attempt re-enqueue all outstanding unacknowledged messages on the channel. May result in delivery to
+ * this same channel or to other subscribers.
*
* @throws org.apache.qpid.AMQException if the requeue fails
*/
@@ -386,23 +374,75 @@ public class AMQChannel
// we must create a new map since all the messages will get a new delivery tag when they are redelivered
Collection<UnacknowledgedMessage> messagesToBeDelivered = _unacknowledgedMessageMap.cancelAllMessages();
+ TransactionalContext nontransacted = null;
+ if (!(_txnContext instanceof NonTransactionalContext))
+ {
+ nontransacted = new NonTransactionalContext(_messageStore, _storeContext, this,
+ _returnMessages, _browsedAcks);
+ }
+
+
for (UnacknowledgedMessage unacked : messagesToBeDelivered)
{
if (unacked.queue != null)
{
- _txnContext.deliver(unacked.message, unacked.queue);
+ // Deliver these messages out of the transaction as their delivery was never
+ // part of the transaction only the receive.
+ if (!(_txnContext instanceof NonTransactionalContext))
+ {
+ nontransacted.deliver(unacked.message, unacked.queue, false);
+ }
+ else
+ {
+ _txnContext.deliver(unacked.message, unacked.queue, false);
+ }
}
}
}
+ public void requeue(long deliveryTag) throws AMQException
+ {
+ UnacknowledgedMessage unacked = _unacknowledgedMessageMap.remove(deliveryTag);
- /**
- * Called to resend all outstanding unacknowledged messages to this same channel.
- */
+ if (unacked != null)
+ {
+ TransactionalContext nontransacted = null;
+ if (!(_txnContext instanceof NonTransactionalContext))
+ {
+ nontransacted = new NonTransactionalContext(_messageStore, _storeContext, this,
+ _returnMessages, _browsedAcks);
+ }
+
+ if (!(_txnContext instanceof NonTransactionalContext))
+ {
+ nontransacted.deliver(unacked.message, unacked.queue, false);
+ }
+ else
+ {
+ _txnContext.deliver(unacked.message, unacked.queue, false);
+ }
+ unacked.message.decrementReference(_storeContext);
+ }
+ else
+ {
+ _log.error("Requested requeue of message:" + deliveryTag + " but no such delivery tag exists");
+ }
+
+
+ }
+
+
+ /** Called to resend all outstanding unacknowledged messages to this same channel. */
public void resend(final AMQProtocolSession session, final boolean requeue) throws AMQException
{
final List<UnacknowledgedMessage> msgToRequeue = new LinkedList<UnacknowledgedMessage>();
+ final List<UnacknowledgedMessage> msgToResend = new LinkedList<UnacknowledgedMessage>();
+
+ if (_log.isInfoEnabled())
+ {
+ _log.info("unacked map contains " + _unacknowledgedMessageMap.size());
+ }
_unacknowledgedMessageMap.visit(new UnacknowledgedMessageMap.Visitor()
{
@@ -412,21 +452,40 @@ public class AMQChannel
AMQShortString consumerTag = message.consumerTag;
AMQMessage msg = message.message;
msg.setRedelivered(true);
- if((consumerTag != null) && _consumerTag2QueueMap.containsKey(consumerTag) && !isSuspended())
+ if (consumerTag != null)
{
- msg.writeDeliver(session, _channelId, deliveryTag, consumerTag);
+ // Consumer exists
+ if (_consumerTag2QueueMap.containsKey(consumerTag))
+ {
+ msgToResend.add(message);
+ }
+ else // consumer has gone
+ {
+ msgToRequeue.add(message);
+ }
}
else
{
// Message has no consumer tag, so was "delivered" to a GET
// or consumer no longer registered
// cannot resend, so re-queue.
- if (message.queue != null && (consumerTag == null || requeue))
+ if (message.queue != null)
+ {
+ if (requeue)
+ {
+ msgToRequeue.add(message);
+ }
+ else
+ {
+ _log.info("No DeadLetter Queue and requeue not requested so dropping message:" + message);
+ }
+ }
+ else
{
- msgToRequeue.add(message);
+ _log.info("Message.queue is null and no DeadLetter Queue so dropping message:" + message);
}
}
-
+
// false means continue processing
return false;
}
@@ -436,21 +495,112 @@ public class AMQChannel
}
});
- for(UnacknowledgedMessage message : msgToRequeue)
+ // Process Messages to Resend
+ if (_log.isInfoEnabled())
+ {
+ if (!msgToResend.isEmpty())
+ {
+ _log.info("Preparing (" + msgToResend.size() + ") message to resend to.");
+ }
+ }
+ for (UnacknowledgedMessage message : msgToResend)
+ {
+ AMQMessage msg = message.message;
+
+ // Our Java Client will always suspend the channel when resending!!
+// if (isSuspended())
+// {
+// _log.info("Channel is suspended so requeuing");
+// //move this message to requeue
+// msgToRequeue.add(message);
+// }
+// else
+ {
+ //release to allow it to be delivered
+ msg.release();
+
+ // Without any details from the client about what has been processed we have to mark
+ // all messages in the unacked map as redelivered.
+ msg.setRedelivered(true);
+
+
+ Subscription sub = msg.getDeliveredSubscription();
+
+ if (sub != null)
+ {
+ synchronized (sub.getSendLock())
+ {
+ if (sub.isClosed())
+ {
+ _log.info("Subscription closed during resend so requeuing message");
+ //move this message to requeue
+ msgToRequeue.add(message);
+ }
+ else
+ {
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("Requeuing (" + System.identityHashCode(msg) + ") for resend");
+ }
+ // Will throw an exception if the sub is closed
+ sub.addToResendQueue(msg);
+ _unacknowledgedMessageMap.remove(message.deliveryTag);
+ // Don't decrement as we are bypassing the normal deliver which increments
+ // this is what there is a decrement on the Requeue as deliver will increment.
+ // msg.decrementReference(_storeContext);
+ }
+ }
+ }
+ else
+ {
+ _log.info("DeliveredSubscription not recorded so just requeueing to prevent loss");
+ //move this message to requeue
+ msgToRequeue.add(message);
+ }
+ }
+ }
+
+ if (_log.isInfoEnabled())
+ {
+ if (!msgToRequeue.isEmpty())
+ {
+ _log.info("Preparing (" + msgToRequeue.size() + ") message to requeue to.");
+ }
+ }
+
+ TransactionalContext nontransacted = null;
+ if (!(_txnContext instanceof NonTransactionalContext))
{
- _txnContext.deliver(message.message, message.queue);
+ nontransacted = new NonTransactionalContext(_messageStore, _storeContext, this,
+ _returnMessages, _browsedAcks);
+ }
+
+ // Process Messages to Requeue at the front of the queue
+ for (UnacknowledgedMessage message : msgToRequeue)
+ {
+ // Deliver these messages out of the transaction as their delivery was never
+ // part of the transaction only the receive.
+ if (!(_txnContext instanceof NonTransactionalContext))
+ {
+ nontransacted.deliver(message.message, message.queue, true);
+ }
+ else
+ {
+ _txnContext.deliver(message.message, message.queue, true);
+ }
+
_unacknowledgedMessageMap.remove(message.deliveryTag);
message.message.decrementReference(_storeContext);
}
}
/**
- * Callback indicating that a queue has been deleted. We must update the structure of unacknowledged
- * messages to remove the queue reference and also decrement any message reference counts, without
- * actually removing the item since we may get an ack for a delivery tag that was generated from the
- * deleted queue.
+ * Callback indicating that a queue has been deleted. We must update the structure of unacknowledged messages to
+ * remove the queue reference and also decrement any message reference counts, without actually removing the item
+ * since we may get an ack for a delivery tag that was generated from the deleted queue.
*
* @param queue the queue that has been deleted
+ *
* @throws org.apache.qpid.AMQException if there is an error processing the unacked messages
*/
public void queueDeleted(final AMQQueue queue) throws AMQException
@@ -487,6 +637,7 @@ public class AMQChannel
* @param deliveryTag the last delivery tag
* @param multiple if true will acknowledge all messages up to an including the delivery tag. if false only
* acknowledges the single message specified by the delivery tag
+ *
* @throws AMQException if the delivery tag is unknown (e.g. not outstanding) on this channel
*/
public void acknowledgeMessage(long deliveryTag, boolean multiple) throws AMQException
@@ -517,10 +668,10 @@ public class AMQChannel
private void checkSuspension()
{
boolean suspend;
-
- suspend = ((_prefetch_HighWaterMark != 0) && _unacknowledgedMessageMap.size() >= _prefetch_HighWaterMark)
- || ((_prefetchSize != 0) && _prefetchSize < _unacknowledgedMessageMap.getUnacknowledgeBytes());
-
+
+ suspend = ((_prefetch_HighWaterMark != 0) && _unacknowledgedMessageMap.size() >= _prefetch_HighWaterMark)
+ || ((_prefetchSize != 0) && _prefetchSize < _unacknowledgedMessageMap.getUnacknowledgeBytes());
+
setSuspended(suspend);
}
@@ -570,8 +721,6 @@ public class AMQChannel
public void rollback() throws AMQException
{
_txnContext.rollback();
-
-
}
public String toString()
@@ -617,8 +766,8 @@ public class AMQChannel
}
else
{
- boolean willSuspend = ((_prefetch_HighWaterMark != 0) && _unacknowledgedMessageMap.size() + 1 > _prefetch_HighWaterMark);
- if(!willSuspend)
+ boolean willSuspend = ((_prefetch_HighWaterMark != 0) && _unacknowledgedMessageMap.size() + 1 > _prefetch_HighWaterMark);
+ if (!willSuspend)
{
final long unackedSize = _unacknowledgedMessageMap.getUnacknowledgeBytes();
@@ -626,7 +775,7 @@ public class AMQChannel
}
- if(willSuspend)
+ if (willSuspend)
{
setSuspended(true);
}
@@ -634,4 +783,9 @@ public class AMQChannel
}
}
+
+ public TransactionalContext getTransactionalContext()
+ {
+ return _txnContext;
+ }
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java b/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java
index f8b6babd43..fdf087fdea 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java
@@ -85,7 +85,6 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap
for (UnacknowledgedMessage msg : msgs)
{
remove(msg.deliveryTag);
-
}
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java
new file mode 100644
index 0000000000..ed13092ded
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java
@@ -0,0 +1,68 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.handler;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.BasicRejectBody;
+import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.ack.UnacknowledgedMessage;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.state.AMQStateManager;
+import org.apache.qpid.server.state.StateAwareMethodListener;
+import org.apache.log4j.Logger;
+
+public class BasicRejectMethodHandler implements StateAwareMethodListener<BasicRejectBody>
+{
+ private static final Logger _logger = Logger.getLogger(BasicRejectMethodHandler.class);
+
+ private static BasicRejectMethodHandler _instance = new BasicRejectMethodHandler();
+
+ public static BasicRejectMethodHandler getInstance()
+ {
+ return _instance;
+ }
+
+ private BasicRejectMethodHandler()
+ {
+ }
+
+ public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<BasicRejectBody> evt) throws AMQException
+ {
+ AMQProtocolSession session = stateManager.getProtocolSession();
+
+ _logger.info("FIXME: Rejecting:" + evt.getMethod().deliveryTag + ": Requeue:" + evt.getMethod().requeue);
+
+ int channelId = evt.getChannelId();
+ UnacknowledgedMessage message = session.getChannel(channelId).getUnacknowledgedMessageMap().get(evt.getMethod().deliveryTag);
+
+ _logger.info("Need to reject message:" + message);
+// if (evt.getMethod().requeue)
+// {
+// session.getChannel(channelId).requeue(evt.getMethod().deliveryTag);
+// }
+// else
+// {
+// // session.getChannel(channelId).resend(message);
+// }
+
+ }
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java
index 8ce5a0ea73..a10f44f906 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java
@@ -62,6 +62,7 @@ public class TxRollbackHandler implements StateAwareMethodListener<TxRollbackBod
session.writeFrame(TxRollbackOkBody.createAMQFrame(evt.getChannelId(), (byte) 8, (byte) 0));
//Now resend all the unacknowledged messages back to the original subscribers.
//(Must be done after the TxnRollback-ok response).
+ // Why, are we not allowed to send messages back to client before the ok method?
channel.resend(session, false);
}
catch (AMQException e)
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
index c60c22c4e4..aa7ea16afc 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
@@ -36,21 +36,15 @@ import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter;
import org.apache.qpid.framing.abstraction.ContentChunk;
import org.apache.qpid.server.txn.TransactionalContext;
-/**
- * Combines the information that make up a deliverable message into a more manageable form.
- */
+/** Combines the information that make up a deliverable message into a more manageable form. */
public class AMQMessage
{
private static final Logger _log = Logger.getLogger(AMQMessage.class);
- /**
- * Used in clustering
- */
+ /** Used in clustering */
private Set<Object> _tokens;
- /**
- * Only use in clustering - should ideally be removed?
- */
+ /** Only use in clustering - should ideally be removed? */
private AMQProtocolSession _publisher;
private final Long _messageId;
@@ -63,16 +57,14 @@ public class AMQMessage
private TransactionalContext _txnContext;
/**
- * Flag to indicate whether message has been delivered to a
- * consumer. Used in implementing return functionality for
+ * Flag to indicate whether message has been delivered to a consumer. Used in implementing return functionality for
* messages published with the 'immediate' flag.
*/
private boolean _deliveredToConsumer;
/**
- * We need to keep track of whether the message was 'immediate'
- * as in extreme circumstances, when the checkDelieveredToConsumer
- * is called, the message may already have been received and acknowledged,
- * and the body removed from the store.
+ * We need to keep track of whether the message was 'immediate' as in extreme circumstances, when the
+ * checkDelieveredToConsumer is called, the message may already have been received and acknowledged, and the body
+ * removed from the store.
*/
private boolean _immediate;
@@ -80,11 +72,16 @@ public class AMQMessage
private TransientMessageData _transientMessageData = new TransientMessageData();
+ private Subscription _takenBySubcription;
+ public boolean isTaken()
+ {
+ return _taken.get();
+ }
/**
- * Used to iterate through all the body frames associated with this message. Will not
- * keep all the data in memory therefore is memory-efficient.
+ * Used to iterate through all the body frames associated with this message. Will not keep all the data in memory
+ * therefore is memory-efficient.
*/
private class BodyFrameIterator implements Iterator<AMQDataBlock>
{
@@ -103,7 +100,7 @@ public class AMQMessage
{
try
{
- return _index < _messageHandle.getBodyCount(getStoreContext(),_messageId) - 1;
+ return _index < _messageHandle.getBodyCount(getStoreContext(), _messageId) - 1;
}
catch (AMQException e)
{
@@ -153,7 +150,7 @@ public class AMQMessage
{
try
{
- return _index < _messageHandle.getBodyCount(getStoreContext(),_messageId) - 1;
+ return _index < _messageHandle.getBodyCount(getStoreContext(), _messageId) - 1;
}
catch (AMQException e)
{
@@ -166,7 +163,7 @@ public class AMQMessage
{
try
{
- return _messageHandle.getContentChunk(getStoreContext(),_messageId, ++_index);
+ return _messageHandle.getContentChunk(getStoreContext(), _messageId, ++_index);
}
catch (AMQException e)
{
@@ -196,12 +193,14 @@ public class AMQMessage
}
/**
- * Used when recovering, i.e. when the message store is creating references to messages.
- * In that case, the normal enqueue/routingComplete is not done since the recovery process
- * is responsible for routing the messages to queues.
+ * Used when recovering, i.e. when the message store is creating references to messages. In that case, the normal
+ * enqueue/routingComplete is not done since the recovery process is responsible for routing the messages to
+ * queues.
+ *
* @param messageId
* @param store
* @param factory
+ *
* @throws AMQException
*/
public AMQMessage(Long messageId, MessageStore store, MessageHandleFactory factory, TransactionalContext txnConext) throws AMQException
@@ -213,8 +212,8 @@ public class AMQMessage
}
/**
- * Used in testing only. This allows the passing of the content header immediately
- * on construction.
+ * Used in testing only. This allows the passing of the content header immediately on construction.
+ *
* @param messageId
* @param info
* @param txnContext
@@ -228,14 +227,15 @@ public class AMQMessage
}
/**
- * Used in testing only. This allows the passing of the content header and some body fragments on
- * construction.
+ * Used in testing only. This allows the passing of the content header and some body fragments on construction.
+ *
* @param messageId
* @param info
* @param txnContext
* @param contentHeader
* @param destinationQueues
* @param contentBodies
+ *
* @throws AMQException
*/
public AMQMessage(Long messageId, MessagePublishInfo info,
@@ -280,7 +280,7 @@ public class AMQMessage
}
else
{
- return _messageHandle.getContentHeaderBody(getStoreContext(),_messageId);
+ return _messageHandle.getContentHeaderBody(getStoreContext(), _messageId);
}
}
@@ -338,16 +338,14 @@ public class AMQMessage
return _messageId;
}
- /**
- * Threadsafe. Increment the reference count on the message.
- */
+ /** Threadsafe. Increment the reference count on the message. */
public void incrementReference()
{
_referenceCount.incrementAndGet();
if (_log.isDebugEnabled())
{
- _log.debug("Ref count on message " + _messageId + " incremented to " + _referenceCount + " " + Arrays.asList(Thread.currentThread().getStackTrace()).subList(0,4));
+ _log.debug("Ref count on message " + _messageId + " incremented to " + _referenceCount + " " + Arrays.asList(Thread.currentThread().getStackTrace()).subList(0, 4));
}
}
@@ -355,7 +353,7 @@ public class AMQMessage
/**
* Threadsafe. This will decrement the reference count and when it reaches zero will remove the message from the
* message store.
- *
+ *
* @throws MessageCleanupException when an attempt was made to remove the message from the message store and that
* failed
*/
@@ -371,7 +369,7 @@ public class AMQMessage
{
if (_log.isDebugEnabled())
{
- _log.debug("Ref count on message " + _messageId + " is zero; removing message" + Arrays.asList(Thread.currentThread().getStackTrace()).subList(0,4));
+ _log.debug("Ref count on message " + _messageId + " is zero; removing message" + Arrays.asList(Thread.currentThread().getStackTrace()).subList(0, 4));
}
@@ -394,13 +392,13 @@ public class AMQMessage
{
if (_log.isDebugEnabled())
{
- _log.debug("Ref count is now " + _referenceCount + " for message id " + _messageId+ "\n" + Arrays.asList(Thread.currentThread().getStackTrace()).subList(0,4));
+ _log.debug("Ref count is now " + _referenceCount + " for message id " + _messageId + "\n" + Arrays.asList(Thread.currentThread().getStackTrace()).subList(0, 4));
if (_referenceCount.get() < 0)
{
Thread.dumpStack();
}
}
- if(_referenceCount.get()<0)
+ if (_referenceCount.get() < 0)
{
throw new MessageCleanupException("Reference count for message id " + _messageId + " has gone below 0.");
}
@@ -419,7 +417,8 @@ public class AMQMessage
/**
* Called selectors to determin if the message has already been sent
- * @return _deliveredToConsumer
+ *
+ * @return _deliveredToConsumer
*/
public boolean getDeliveredToConsumer()
{
@@ -427,10 +426,17 @@ public class AMQMessage
}
-
- public boolean taken()
+ public boolean taken(Subscription sub)
{
- return _taken.getAndSet(true);
+ if (_taken.getAndSet(true))
+ {
+ return true;
+ }
+ else
+ {
+ _takenBySubcription = sub;
+ return false;
+ }
}
public void release()
@@ -441,9 +447,9 @@ public class AMQMessage
public boolean checkToken(Object token)
{
- if(_tokens==null)
+ if (_tokens == null)
{
- _tokens = new HashSet<Object>();
+ _tokens = new HashSet<Object>();
}
if (_tokens.contains(token))
@@ -458,11 +464,12 @@ public class AMQMessage
}
/**
- * Registers a queue to which this message is to be delivered. This is
- * called from the exchange when it is routing the message. This will be called before any content bodies have
- * been received so that the choice of AMQMessageHandle implementation can be picked based on various criteria.
+ * Registers a queue to which this message is to be delivered. This is called from the exchange when it is routing
+ * the message. This will be called before any content bodies have been received so that the choice of
+ * AMQMessageHandle implementation can be picked based on various criteria.
*
* @param queue the queue
+ *
* @throws org.apache.qpid.AMQException if there is an error enqueuing the message
*/
public void enqueue(AMQQueue queue) throws AMQException
@@ -483,16 +490,15 @@ public class AMQMessage
}
else
{
- return _messageHandle.isPersistent(getStoreContext(),_messageId);
+ return _messageHandle.isPersistent(getStoreContext(), _messageId);
}
}
/**
* Called to enforce the 'immediate' flag.
*
- * @throws NoConsumersException if the message is marked for
- * immediate delivery but has not been marked as delivered to a
- * consumer
+ * @throws NoConsumersException if the message is marked for immediate delivery but has not been marked as delivered
+ * to a consumer
*/
public void checkDeliveredToConsumer() throws NoConsumersException, AMQException
{
@@ -500,7 +506,7 @@ public class AMQMessage
if (_immediate && !_deliveredToConsumer)
{
throw new NoConsumersException(this);
- }
+ }
}
public MessagePublishInfo getMessagePublishInfo() throws AMQException
@@ -512,7 +518,7 @@ public class AMQMessage
}
else
{
- pb = _messageHandle.getMessagePublishInfo(getStoreContext(),_messageId);
+ pb = _messageHandle.getMessagePublishInfo(getStoreContext(), _messageId);
}
return pb;
}
@@ -533,10 +539,7 @@ public class AMQMessage
}
- /**
- * Called when this message is delivered to a consumer. (used to
- * implement the 'immediate' flag functionality).
- */
+ /** Called when this message is delivered to a consumer. (used to implement the 'immediate' flag functionality). */
public void setDeliveredToConsumer()
{
_deliveredToConsumer = true;
@@ -566,7 +569,7 @@ public class AMQMessage
for (AMQQueue q : destinationQueues)
{
- _txnContext.deliver(this, q);
+ _txnContext.deliver(this, q, true);
}
}
finally
@@ -583,23 +586,22 @@ public class AMQMessage
AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
getContentHeaderBody());
- final int bodyCount = _messageHandle.getBodyCount(getStoreContext(),_messageId);
- if(bodyCount == 0)
+ final int bodyCount = _messageHandle.getBodyCount(getStoreContext(), _messageId);
+ if (bodyCount == 0)
{
SmallCompositeAMQDataBlock compositeBlock = new SmallCompositeAMQDataBlock(deliver,
- contentHeader);
+ contentHeader);
protocolSession.writeFrame(compositeBlock);
}
else
{
-
//
// Optimise the case where we have a single content body. In that case we create a composite block
// so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
//
- ContentChunk cb = _messageHandle.getContentChunk(getStoreContext(),_messageId, 0);
+ ContentChunk cb = _messageHandle.getContentChunk(getStoreContext(), _messageId, 0);
AMQDataBlock firstContentBody = new AMQFrame(channelId, protocolSession.getRegistry().getProtocolVersionMethodConverter().convertToBody(cb));
AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody};
@@ -609,9 +611,9 @@ public class AMQMessage
//
// Now start writing out the other content bodies
//
- for(int i = 1; i < bodyCount; i++)
+ for (int i = 1; i < bodyCount; i++)
{
- cb = _messageHandle.getContentChunk(getStoreContext(),_messageId, i);
+ cb = _messageHandle.getContentChunk(getStoreContext(), _messageId, i);
protocolSession.writeFrame(new AMQFrame(channelId, protocolSession.getRegistry().getProtocolVersionMethodConverter().convertToBody(cb)));
}
@@ -627,22 +629,21 @@ public class AMQMessage
AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
getContentHeaderBody());
- final int bodyCount = _messageHandle.getBodyCount(getStoreContext(),_messageId);
- if(bodyCount == 0)
+ final int bodyCount = _messageHandle.getBodyCount(getStoreContext(), _messageId);
+ if (bodyCount == 0)
{
SmallCompositeAMQDataBlock compositeBlock = new SmallCompositeAMQDataBlock(deliver,
- contentHeader);
+ contentHeader);
protocolSession.writeFrame(compositeBlock);
}
else
{
-
//
// Optimise the case where we have a single content body. In that case we create a composite block
// so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
//
- ContentChunk cb = _messageHandle.getContentChunk(getStoreContext(),_messageId, 0);
+ ContentChunk cb = _messageHandle.getContentChunk(getStoreContext(), _messageId, 0);
AMQDataBlock firstContentBody = new AMQFrame(channelId, protocolSession.getRegistry().getProtocolVersionMethodConverter().convertToBody(cb));
AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody};
@@ -652,9 +653,9 @@ public class AMQMessage
//
// Now start writing out the other content bodies
//
- for(int i = 1; i < bodyCount; i++)
+ for (int i = 1; i < bodyCount; i++)
{
- cb = _messageHandle.getContentChunk(getStoreContext(),_messageId, i);
+ cb = _messageHandle.getContentChunk(getStoreContext(), _messageId, i);
protocolSession.writeFrame(new AMQFrame(channelId, protocolSession.getRegistry().getProtocolVersionMethodConverter().convertToBody(cb)));
}
@@ -685,10 +686,10 @@ public class AMQMessage
AMQFrame getOkFrame = BasicGetOkBody.createAMQFrame(channelId,
protocolSession.getProtocolMajorVersion(),
protocolSession.getProtocolMinorVersion(),
- deliveryTag, pb.getExchange(),
- queueSize,
- _messageHandle.isRedelivered(),
- pb.getRoutingKey());
+ deliveryTag, pb.getExchange(),
+ queueSize,
+ _messageHandle.isRedelivered(),
+ pb.getRoutingKey());
ByteBuffer buf = ByteBuffer.allocate((int) getOkFrame.getSize()); // XXX: Could cast be a problem?
getOkFrame.writePayload(buf);
buf.flip();
@@ -699,7 +700,7 @@ public class AMQMessage
{
AMQFrame returnFrame = BasicReturnBody.createAMQFrame(channelId,
protocolSession.getProtocolMajorVersion(),
- protocolSession.getProtocolMinorVersion(),
+ protocolSession.getProtocolMinorVersion(),
getMessagePublishInfo().getExchange(),
replyCode, replyText,
getMessagePublishInfo().getRoutingKey());
@@ -757,12 +758,11 @@ public class AMQMessage
}
catch (AMQException e)
{
- _log.error(e.toString(),e);
+ _log.error(e.toString(), e);
return 0;
}
- }
-
+ }
public void restoreTransientMessageData() throws AMQException
@@ -771,7 +771,7 @@ public class AMQMessage
transientMessageData.setMessagePublishInfo(getMessagePublishInfo());
transientMessageData.setContentHeaderBody(getContentHeaderBody());
transientMessageData.addBodyLength(getContentHeaderBody().getSize());
- _transientMessageData = transientMessageData;
+ _transientMessageData = transientMessageData;
}
@@ -784,6 +784,11 @@ public class AMQMessage
public String toString()
{
return "Message: " + _messageId + "; ref count: " + _referenceCount + "; taken: " +
- _taken;
+ _taken + " by:" + _takenBySubcription;
+ }
+
+ public Subscription getDeliveredSubscription()
+ {
+ return _takenBySubcription;
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
index e9ebe6c541..429829e201 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
@@ -45,13 +45,11 @@ import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.virtualhost.VirtualHost;
/**
- * This is an AMQ Queue, and should not be confused with a JMS queue or any other abstraction like
- * that. It is described fully in RFC 006.
+ * This is an AMQ Queue, and should not be confused with a JMS queue or any other abstraction like that. It is described
+ * fully in RFC 006.
*/
public class AMQQueue implements Managable, Comparable
{
-
-
public static final class ExistingExclusiveSubscription extends AMQException
{
@@ -74,26 +72,19 @@ public class AMQQueue implements Managable, Comparable
private static final ExistingSubscriptionPreventsExclusive EXISTING_SUBSCRIPTION = new ExistingSubscriptionPreventsExclusive();
-
private static final Logger _logger = Logger.getLogger(AMQQueue.class);
private final AMQShortString _name;
- /**
- * null means shared
- */
+ /** null means shared */
private final AMQShortString _owner;
private final boolean _durable;
- /**
- * If true, this queue is deleted when the last subscriber is removed
- */
+ /** If true, this queue is deleted when the last subscriber is removed */
private final boolean _autoDelete;
- /**
- * Holds subscribers to the queue.
- */
+ /** Holds subscribers to the queue. */
private final SubscriptionSet _subscribers;
private final SubscriptionFactory _subscriptionFactory;
@@ -106,20 +97,13 @@ public class AMQQueue implements Managable, Comparable
private List<Task> _deleteTaskList = new CopyOnWriteArrayList<Task>();
- /**
- * Manages message delivery.
- */
+ /** Manages message delivery. */
private final DeliveryManager _deliveryMgr;
- /**
- * Used to track bindings to exchanges so that on deletion they can easily
- * be cancelled.
- */
+ /** Used to track bindings to exchanges so that on deletion they can easily be cancelled. */
private final ExchangeBindings _bindings = new ExchangeBindings(this);
- /**
- * Executor on which asynchronous delivery will be carriedout where required
- */
+ /** Executor on which asynchronous delivery will be carriedout where required */
private final Executor _asyncDelivery;
private final AMQQueueMBean _managedObject;
@@ -127,39 +111,27 @@ public class AMQQueue implements Managable, Comparable
private final VirtualHost _virtualHost;
- /**
- * max allowed size(KB) of a single message
- */
+ /** max allowed size(KB) of a single message */
@Configured(path = "maximumMessageSize", defaultValue = "0")
public long _maximumMessageSize;
- /**
- * max allowed number of messages on a queue.
- */
+ /** max allowed number of messages on a queue. */
@Configured(path = "maximumMessageCount", defaultValue = "0")
public int _maximumMessageCount;
- /**
- * max queue depth for the queue
- */
+ /** max queue depth for the queue */
@Configured(path = "maximumQueueDepth", defaultValue = "0")
public long _maximumQueueDepth;
- /**
- * maximum message age before alerts occur
- */
+ /** maximum message age before alerts occur */
@Configured(path = "maximumMessageAge", defaultValue = "0")
public long _maximumMessageAge;
- /**
- * the minimum interval between sending out consequetive alerts of the same type
- */
+ /** the minimum interval between sending out consequetive alerts of the same type */
@Configured(path = "minimumAlertRepeatGap", defaultValue = "0")
public long _minimumAlertRepeatGap;
- /**
- * total messages received by the queue since startup.
- */
+ /** total messages received by the queue since startup. */
public AtomicLong _totalMessagesReceived = new AtomicLong();
public int compareTo(Object o)
@@ -176,7 +148,6 @@ public class AMQQueue implements Managable, Comparable
}
-
protected AMQQueue(AMQShortString name, boolean durable, AMQShortString owner,
boolean autoDelete, VirtualHost virtualHost,
SubscriptionSet subscribers)
@@ -211,7 +182,7 @@ public class AMQQueue implements Managable, Comparable
_subscribers = subscribers;
_subscriptionFactory = subscriptionFactory;
- _deliveryMgr = new ConcurrentSelectorDeliveryManager(_subscribers, this);
+ _deliveryMgr = new ConcurrentSelectorDeliveryManager(_subscribers, this);
}
private AMQQueueMBean createMBean() throws AMQException
@@ -251,17 +222,13 @@ public class AMQQueue implements Managable, Comparable
return _autoDelete;
}
- /**
- * @return no of messages(undelivered) on the queue.
- */
+ /** @return no of messages(undelivered) on the queue. */
public int getMessageCount()
{
return _deliveryMgr.getQueueMessageCount();
}
- /**
- * @return List of messages(undelivered) on the queue.
- */
+ /** @return List of messages(undelivered) on the queue. */
public List<AMQMessage> getMessagesOnTheQueue()
{
return _deliveryMgr.getMessages();
@@ -275,6 +242,7 @@ public class AMQQueue implements Managable, Comparable
/**
* @param messageId
+ *
* @return AMQMessage with give id if exists. null if AMQMessage with given id doesn't exist.
*/
public AMQMessage getMessageOnTheQueue(long messageId)
@@ -294,13 +262,12 @@ public class AMQQueue implements Managable, Comparable
}
/**
- * moves messages from this queue to another queue. to do this the approach is following-
- * - setup the queue for moving messages (hold the lock and stop the async delivery)
- * - get all the messages available in the given message id range
- * - setup the other queue for moving messages (hold the lock and stop the async delivery)
- * - send these available messages to the other queue (enqueue in other queue)
- * - Once sending to other Queue is successful, remove messages from this queue
- * - remove locks from both queues and start async delivery
+ * moves messages from this queue to another queue. to do this the approach is following- - setup the queue for
+ * moving messages (hold the lock and stop the async delivery) - get all the messages available in the given message
+ * id range - setup the other queue for moving messages (hold the lock and stop the async delivery) - send these
+ * available messages to the other queue (enqueue in other queue) - Once sending to other Queue is successful,
+ * remove messages from this queue - remove locks from both queues and start async delivery
+ *
* @param fromMessageId
* @param toMessageId
* @param queueName
@@ -316,7 +283,7 @@ public class AMQQueue implements Managable, Comparable
startMovingMessages();
List<AMQMessage> list = getMessagesOnTheQueue();
List<AMQMessage> foundMessagesList = new ArrayList<AMQMessage>();
- int maxMessageCountToBeMoved = (int)(toMessageId - fromMessageId + 1);
+ int maxMessageCountToBeMoved = (int) (toMessageId - fromMessageId + 1);
// Run this loop till you find all the messages or the list has no more messages
for (AMQMessage message : list)
@@ -344,7 +311,7 @@ public class AMQQueue implements Managable, Comparable
{
// remove the lock and start the async delivery
anotherQueue.stopMovingMessages();
- stopMovingMessages();
+ stopMovingMessages();
}
}
@@ -364,10 +331,8 @@ public class AMQQueue implements Managable, Comparable
_deliveryMgr.stopMovingMessages();
_deliveryMgr.processAsync(_asyncDelivery);
}
-
- /**
- * @return MBean object associated with this Queue
- */
+
+ /** @return MBean object associated with this Queue */
public ManagedObject getManagedObject()
{
return _managedObject;
@@ -422,20 +387,16 @@ public class AMQQueue implements Managable, Comparable
public long getOldestMessageArrivalTime()
{
return _deliveryMgr.getOldestMessageArrival();
-
+
}
- /**
- * Removes the AMQMessage from the top of the queue.
- */
+ /** Removes the AMQMessage from the top of the queue. */
public synchronized void deleteMessageFromTop(StoreContext storeContext) throws AMQException
{
_deliveryMgr.removeAMessageFromTop(storeContext);
}
- /**
- * removes all the messages from the queue.
- */
+ /** removes all the messages from the queue. */
public synchronized long clearQueue(StoreContext storeContext) throws AMQException
{
return _deliveryMgr.clearAllMessages(storeContext);
@@ -443,10 +404,10 @@ public class AMQQueue implements Managable, Comparable
public void bind(AMQShortString routingKey, FieldTable arguments, Exchange exchange) throws AMQException
{
- exchange.registerQueue(routingKey, this, arguments);
- if(isDurable() && exchange.isDurable())
+ exchange.registerQueue(routingKey, this, arguments);
+ if (isDurable() && exchange.isDurable())
{
- _virtualHost.getMessageStore().bindQueue(exchange,routingKey,this,arguments);
+ _virtualHost.getMessageStore().bindQueue(exchange, routingKey, this, arguments);
}
_bindings.addBinding(routingKey, arguments, exchange);
}
@@ -454,9 +415,9 @@ public class AMQQueue implements Managable, Comparable
public void unBind(AMQShortString routingKey, FieldTable arguments, Exchange exchange) throws AMQException
{
exchange.deregisterQueue(routingKey, this, arguments);
- if(isDurable() && exchange.isDurable())
+ if (isDurable() && exchange.isDurable())
{
- _virtualHost.getMessageStore().unbindQueue(exchange,routingKey,this,arguments);
+ _virtualHost.getMessageStore().unbindQueue(exchange, routingKey, this, arguments);
}
_bindings.remove(routingKey, arguments, exchange);
}
@@ -466,30 +427,31 @@ public class AMQQueue implements Managable, Comparable
FieldTable filters, boolean noLocal, boolean exclusive)
throws AMQException
{
- if(incrementSubscriberCount() > 1)
+ if (incrementSubscriberCount() > 1)
{
- if(isExclusive())
+ if (isExclusive())
{
decrementSubscriberCount();
throw EXISTING_EXCLUSIVE;
}
- else if(exclusive)
+ else if (exclusive)
{
decrementSubscriberCount();
throw EXISTING_SUBSCRIPTION;
}
}
- else if(exclusive)
+ else if (exclusive)
{
setExclusive(true);
}
debug("Registering protocol session {0} with channel {1} and consumer tag {2} with {3}", ps, channel, consumerTag, this);
- Subscription subscription = _subscriptionFactory.createSubscription(channel, ps, consumerTag, acks, filters, noLocal);
+ Subscription subscription = _subscriptionFactory.createSubscription(channel, ps, consumerTag, acks,
+ filters, noLocal, this);
- if(subscription.hasFilters())
+ if (subscription.hasFilters())
{
if (_deliveryMgr.hasQueuedMessages())
{
@@ -537,10 +499,10 @@ public class AMQQueue implements Managable, Comparable
" and protocol session key " + ps.getKey() + " not registered with queue " + this);
}
+ removedSubscription.close();
setExclusive(false);
decrementSubscriberCount();
-
// if we are eligible for auto deletion, unregister from the queue registry
if (_autoDelete && _subscribers.isEmpty())
{
@@ -583,13 +545,13 @@ public class AMQQueue implements Managable, Comparable
public void delete() throws AMQException
{
- if(!_deleted.getAndSet(true))
+ if (!_deleted.getAndSet(true))
{
_subscribers.queueDeleted(this);
_bindings.deregister();
_virtualHost.getQueueRegistry().unregisterQueue(_name);
_managedObject.unregister();
- for(Task task : _deleteTaskList)
+ for (Task task : _deleteTaskList)
{
task.doTask(this);
}
@@ -605,7 +567,8 @@ public class AMQQueue implements Managable, Comparable
public void processGet(StoreContext storeContext, AMQMessage msg) throws AMQException
{
- _deliveryMgr.deliver(storeContext, getName(), msg);
+ //fixme not sure what this is doing. should we be passing deliverFirst through here?
+ _deliveryMgr.deliver(storeContext, getName(), msg, false);
try
{
msg.checkDeliveredToConsumer();
@@ -620,9 +583,9 @@ public class AMQQueue implements Managable, Comparable
}
- public void process(StoreContext storeContext, AMQMessage msg) throws AMQException
+ public void process(StoreContext storeContext, AMQMessage msg, boolean deliverFirst) throws AMQException
{
- _deliveryMgr.deliver(storeContext, getName(), msg);
+ _deliveryMgr.deliver(storeContext, getName(), msg, deliverFirst);
try
{
msg.checkDeliveredToConsumer();
@@ -731,7 +694,7 @@ public class AMQQueue implements Managable, Comparable
public static interface Task
{
- public void doTask(AMQQueue queue) throws AMQException;
+ public void doTask(AMQQueue queue) throws AMQException;
}
public void addQueueDeleteTask(Task task)
@@ -759,4 +722,8 @@ public class AMQQueue implements Managable, Comparable
_maximumMessageAge = maximumMessageAge;
}
+ public void subscriberHasPendingResend(boolean hasContent, SubscriptionImpl subscription, AMQMessage msg)
+ {
+ _deliveryMgr.subscriberHasPendingResend(hasContent, subscription, msg);
+ }
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
index 0fc8753a87..208a59516c 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
@@ -24,9 +24,14 @@ import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Queue;
+import java.util.Set;
+import java.util.Collections;
+import java.util.HashSet;
import java.util.concurrent.Executor;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.log4j.Logger;
@@ -38,12 +43,12 @@ import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.configuration.Configurator;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.util.MessageQueue;
+import org.apache.qpid.util.ConcurrentLinkedMessageQueueAtomicSize;
import org.apache.qpid.util.ConcurrentLinkedQueueAtomicSize;
-/**
- * Manages delivery of messages on behalf of a queue
- */
+/** Manages delivery of messages on behalf of a queue */
public class ConcurrentSelectorDeliveryManager implements DeliveryManager
{
private static final Logger _log = Logger.getLogger(ConcurrentSelectorDeliveryManager.class);
@@ -51,47 +56,36 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
@Configured(path = "advanced.compressBufferOnQueue",
defaultValue = "false")
public boolean compressBufferOnQueue;
- /**
- * Holds any queued messages
- */
- private final Queue<AMQMessage> _messages = new ConcurrentLinkedQueueAtomicSize<AMQMessage>();
-
- private final ReentrantLock _messageAccessLock = new ReentrantLock();
+ /** Holds any queued messages */
+ private final MessageQueue<AMQMessage> _messages = new ConcurrentLinkedMessageQueueAtomicSize<AMQMessage>();
- //private int _messageCount;
- /**
- * Ensures that only one asynchronous task is running for this manager at
- * any time.
- */
+ /** Ensures that only one asynchronous task is running for this manager at any time. */
private final AtomicBoolean _processing = new AtomicBoolean();
- /**
- * The subscriptions on the queue to whom messages are delivered
- */
+ /** The subscriptions on the queue to whom messages are delivered */
private final SubscriptionManager _subscriptions;
/**
- * A reference to the queue we are delivering messages for. We need this to be able
- * to pass the code that handles acknowledgements a handle on the queue.
+ * A reference to the queue we are delivering messages for. We need this to be able to pass the code that handles
+ * acknowledgements a handle on the queue.
*/
private final AMQQueue _queue;
/**
- * Flag used while moving messages from this queue to another. For moving messages the async delivery
- * should also stop. This flat should be set to true to stop async delivery and set to false to enable
- * async delivery again.
+ * Flag used while moving messages from this queue to another. For moving messages the async delivery should also
+ * stop. This flat should be set to true to stop async delivery and set to false to enable async delivery again.
*/
private AtomicBoolean _movingMessages = new AtomicBoolean();
-
+
/**
* Lock used to ensure that an channel that becomes unsuspended during the start of the queueing process is forced
- * to wait till the first message is added to the queue. This will ensure that the _queue has messages to be delivered
- * via the async thread.
- * <p/>
- * Lock is used to control access to hasQueuedMessages() and over the addition of messages to the queue.
+ * to wait till the first message is added to the queue. This will ensure that the _queue has messages to be
+ * delivered via the async thread. <p/> Lock is used to control access to hasQueuedMessages() and over the addition
+ * of messages to the queue.
*/
private ReentrantLock _lock = new ReentrantLock();
private AtomicLong _totalMessageSize = new AtomicLong();
-
+ private AtomicInteger _extraMessages = new AtomicInteger();
+ private Set<Subscription> _hasContent = Collections.synchronizedSet(new HashSet<Subscription>());
ConcurrentSelectorDeliveryManager(SubscriptionManager subscriptions, AMQQueue queue)
{
@@ -109,7 +103,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
}
- private boolean addMessageToQueue(AMQMessage msg)
+ private boolean addMessageToQueue(AMQMessage msg, boolean deliverFirst)
{
// Shrink the ContentBodies to their actual size to save memory.
if (compressBufferOnQueue)
@@ -122,7 +116,14 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
}
}
- _messages.offer(msg);
+ if (deliverFirst)
+ {
+ _messages.pushHead(msg);
+ }
+ else
+ {
+ _messages.offer(msg);
+ }
_totalMessageSize.addAndGet(msg.getSize());
@@ -135,7 +136,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
_lock.lock();
try
{
- return !_messages.isEmpty();
+ return !(_messages.isEmpty() && _hasContent.isEmpty());
}
finally
{
@@ -149,18 +150,17 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
}
/**
- * This is an EXPENSIVE opperation to perform with a ConcurrentLinkedQueue as it must run the queue to determine size.
- * The ConcurrentLinkedQueueAtomicSize uses an AtomicInteger to record the number of elements on the queue.
+ * This is an EXPENSIVE opperation to perform with a ConcurrentLinkedQueue as it must run the queue to determine
+ * size. The ConcurrentLinkedQueueAtomicSize uses an AtomicInteger to record the number of elements on the queue.
*
* @return int the number of messages in the delivery queue.
*/
private int getMessageCount()
{
- return _messages.size();
+ return _messages.size() + _extraMessages.get();
}
-
public long getTotalMessageSize()
{
return _totalMessageSize.get();
@@ -172,6 +172,38 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
return msg == null ? Long.MAX_VALUE : msg.getArrivalTime();
}
+ public void subscriberHasPendingResend(boolean hasContent, Subscription subscription, AMQMessage msg)
+ {
+ _lock.lock();
+ try
+ {
+ if (hasContent)
+ {
+ _log.debug("Queue has adding subscriber content");
+ _hasContent.add(subscription);
+ _totalMessageSize.addAndGet(msg.getSize());
+ _extraMessages.addAndGet(1);
+ }
+ else
+ {
+ _log.debug("Queue has removing subscriber content");
+ if (msg == null)
+ {
+ _hasContent.remove(subscription);
+ }
+ else
+ {
+ _totalMessageSize.addAndGet(-msg.getSize());
+ _extraMessages.addAndGet(-1);
+ }
+ }
+ }
+ finally
+ {
+ _lock.unlock();
+ }
+ }
+
public List<AMQMessage> getMessages()
{
@@ -195,7 +227,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
AMQMessage message = currentQueue.next();
if (subscription.hasInterest(message))
{
- subscription.enqueueForPreDelivery(message);
+ subscription.enqueueForPreDelivery(message, false);
}
}
}
@@ -203,7 +235,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
public boolean performGet(AMQProtocolSession protocolSession, AMQChannel channel, boolean acks) throws AMQException
{
AMQMessage msg = getNextMessage();
- if(msg == null)
+ if (msg == null)
{
return false;
}
@@ -229,7 +261,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
}
_queue.dequeue(channel.getStoreContext(), msg);
}
- synchronized(channel)
+ synchronized (channel)
{
long deliveryTag = channel.getNextDeliveryTag();
@@ -252,8 +284,8 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
}
/**
- * For feature of moving messages, this method is used. It sets the lock and sets the movingMessages flag,
- * so that the asyn delivery is also stopped.
+ * For feature of moving messages, this method is used. It sets the lock and sets the movingMessages flag, so that
+ * the asyn delivery is also stopped.
*/
public void startMovingMessages()
{
@@ -262,8 +294,8 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
}
/**
- * Once moving messages to another queue is done or aborted, remove lock and unset the movingMessages flag,
- * so that the async delivery can start again.
+ * Once moving messages to another queue is done or aborted, remove lock and unset the movingMessages flag, so that
+ * the async delivery can start again.
*/
public void stopMovingMessages()
{
@@ -276,6 +308,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
/**
* Messages will be removed from this queue and all preDeliveryQueues
+ *
* @param messageList
*/
public void removeMovedMessages(List<AMQMessage> messageList)
@@ -308,7 +341,9 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
/**
* Now with implementation of predelivery queues, this method will mark the message on the top as taken.
+ *
* @param storeContext
+ *
* @throws AMQException
*/
public void removeAMessageFromTop(StoreContext storeContext) throws AMQException
@@ -318,11 +353,11 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
if (msg != null)
{
// mark this message as taken and get it removed
- msg.taken();
+ msg.taken(null);
_queue.dequeue(storeContext, msg);
getNextMessage();
}
-
+
_lock.unlock();
}
@@ -335,7 +370,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
while (msg != null)
{
//mark this message as taken and get it removed
- msg.taken();
+ msg.taken(null);
_queue.dequeue(storeContext, msg);
msg = getNextMessage();
count++;
@@ -347,20 +382,15 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
public synchronized AMQMessage getNextMessage() throws AMQException
{
- return getNextMessage(_messages);
+ return getNextMessage(_messages, null);
}
-
- private AMQMessage getNextMessage(Queue<AMQMessage> messages)
- {
- return getNextMessage(messages, false);
- }
-
- private AMQMessage getNextMessage(Queue<AMQMessage> messages, boolean browsing)
+ private AMQMessage getNextMessage(Queue<AMQMessage> messages, Subscription sub)
{
AMQMessage message = messages.peek();
- while (message != null && (browsing || message.taken()))
+
+ while (message != null && ((sub == null || sub.isBrowser()) || message.taken(sub)))
{
//remove the already taken message
messages.poll();
@@ -371,27 +401,76 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
return message;
}
- public void sendNextMessage(Subscription sub, Queue<AMQMessage> messageQueue)
+ public void sendNextMessage(Subscription sub, AMQQueue queue)//Queue<AMQMessage> messageQueue)
{
+
+ Queue<AMQMessage> messageQueue = sub.getNextQueue(_messages);
+
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("Async sendNextMessage for sub (" + System.identityHashCode(sub) +
+ ") from queue (" + System.identityHashCode(messageQueue) +
+ ") AMQQueue (" + System.identityHashCode(queue) + ")");
+ }
+
+ if (messageQueue == null)
+ {
+ // There is no queue with messages currently. This is ok... just means the queue has no msgs matching selector
+ if (_log.isDebugEnabled())
+ {
+ _log.debug(sub + ": asked to send messages but has none on given queue:" + queue);
+ }
+ return;
+ }
+
AMQMessage message = null;
try
{
- message = getNextMessage(messageQueue, sub.isBrowser());
+ message = getNextMessage(messageQueue, sub);
// message will be null if we have no messages in the messageQueue.
if (message == null)
{
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("No messages for Subscriber(" + System.identityHashCode(sub) + ") from queue; (" + System.identityHashCode(messageQueue) + ")");
+ }
return;
}
if (_log.isDebugEnabled())
{
- _log.debug("Async Delivery Message:" + message + " to :" + sub);
+ _log.debug("Async Delivery Message (" + System.identityHashCode(message) +
+ ") by :" + System.identityHashCode(this) +
+ ") to :" + System.identityHashCode(sub));
}
sub.send(message, _queue);
//remove sent message from our queue.
messageQueue.poll();
+ //If we don't remove the message from _messages
+ // Otherwise the Async send will never end
+
+ if (messageQueue == sub.getResendQueue())
+ {
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("All messages sent from resendQueue for " + sub);
+ }
+ if (messageQueue.isEmpty())
+ {
+ subscriberHasPendingResend(false, sub, null);
+ //better to use the above method as this keeps all the tracking in one location.
+// _hasContent.remove(sub);
+ }
+
+ _extraMessages.decrementAndGet();
+ }
+ else if (messageQueue == sub.getPreDeliveryQueue())
+ {
+ _log.info("We could do clean up of the main _message queue here");
+ }
+
_totalMessageSize.addAndGet(-message.getSize());
}
catch (AMQException e)
@@ -403,6 +482,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
/**
* enqueues the messages in the list on the queue and all required predelivery queues
+ *
* @param storeContext
* @param movedMessageList
*/
@@ -411,7 +491,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
_lock.lock();
for (AMQMessage msg : movedMessageList)
{
- addMessageToQueue(msg);
+ addMessageToQueue(msg, true);
}
// enqueue on the pre delivery queues
@@ -422,7 +502,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
// Only give the message to those that want them.
if (sub.hasInterest(msg))
{
- sub.enqueueForPreDelivery(msg);
+ sub.enqueueForPreDelivery(msg, true);
}
}
}
@@ -430,8 +510,8 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
}
/**
- * Only one thread should ever execute this method concurrently, but
- * it can do so while other threads invoke deliver().
+ * Only one thread should ever execute this method concurrently, but it can do so while other threads invoke
+ * deliver().
*/
private void processQueue()
{
@@ -444,40 +524,43 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
for (Subscription sub : _subscriptions.getSubscriptions())
{
- if (!sub.isSuspended())
+ synchronized (sub.getSendLock())
{
- sendNextMessage(sub);
-
- hasSubscribers = true;
- }
- }
- }
- }
+ if (!sub.isSuspended())
+ {
+ sendNextMessage(sub, _queue);
- private void sendNextMessage(Subscription sub)
- {
- if (sub.hasFilters())
- {
- sendNextMessage(sub, sub.getPreDeliveryQueue());
- if (sub.isAutoClose())
- {
- if (sub.getPreDeliveryQueue().isEmpty())
- {
- sub.close();
+ hasSubscribers = true;
+ }
}
}
}
- else
- {
- sendNextMessage(sub, _messages);
- }
}
- public void deliver(StoreContext context, AMQShortString name, AMQMessage msg) throws AMQException
+// private void sendNextMessage(Subscription sub)
+// {
+// if (sub.hasFilters())
+// {
+// sendNextMessage(sub, sub.getPreDeliveryQueue());
+// if (sub.isAutoClose())
+// {
+// if (sub.getPreDeliveryQueue().isEmpty())
+// {
+// sub.close();
+// }
+// }
+// }
+// else
+// {
+// sendNextMessage(sub, _messages);
+// }
+// }
+
+ public void deliver(StoreContext context, AMQShortString name, AMQMessage msg, boolean deliverFirst) throws AMQException
{
if (_log.isDebugEnabled())
{
- _log.debug(id() + "deliver :" + msg);
+ _log.debug(id() + "deliver :first(" + deliverFirst + ") :" + msg);
}
msg.release();
@@ -491,11 +574,11 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
{
if (_log.isDebugEnabled())
{
- _log.debug(id() + "Testing Message(" + msg + ") for Queued Delivery");
+ _log.debug(id() + "Testing Message(" + msg + ") for Queued Delivery:" + currentStatus());
}
if (!msg.getMessagePublishInfo().isImmediate())
{
- addMessageToQueue(msg);
+ addMessageToQueue(msg, deliverFirst);
//release lock now message is on queue.
_lock.unlock();
@@ -504,7 +587,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
if (_log.isDebugEnabled())
{
_log.debug(id() + "We have " + _subscriptions.getSubscriptions().size() +
- " subscribers to give the message to.");
+ " subscribers to give the message to:" + currentStatus());
}
for (Subscription sub : _subscriptions.getSubscriptions())
{
@@ -528,7 +611,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
_log.debug(id() + "Queuing message(" + System.identityHashCode(msg) +
") for PreDelivery for subscriber(" + System.identityHashCode(sub) + ")");
}
- sub.enqueueForPreDelivery(msg);
+ sub.enqueueForPreDelivery(msg, deliverFirst);
}
}
}
@@ -537,14 +620,47 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
{
//release lock now
_lock.unlock();
-
- if (_log.isDebugEnabled())
+ synchronized (s.getSendLock())
{
- _log.debug(id() + "Delivering Message:" + System.identityHashCode(msg) + " to(" +
- System.identityHashCode(s) + ") :" + s);
+ if (!s.isSuspended())
+ {
+ if (_log.isDebugEnabled())
+ {
+ _log.debug(id() + "Delivering Message:" + System.identityHashCode(msg) + " to(" +
+ System.identityHashCode(s) + ") :" + s);
+ }
+ msg.taken(s);
+ //Deliver the message
+ s.send(msg, _queue);
+ }
+ else
+ {
+ if (_log.isDebugEnabled())
+ {
+ _log.debug(id() + " Subscription(" + System.identityHashCode(s) + ") became suspended between nextSubscriber and send");
+ }
+ }
+
+ if (!msg.isTaken())
+ {
+ if (_log.isDebugEnabled())
+ {
+ _log.debug(id() + " Message(" + System.identityHashCode(msg) + ") has not been taken so recursing!:" +
+ " Subscriber:" + System.identityHashCode(s));
+ }
+
+ deliver(context, name, msg, deliverFirst);
+ }
+ else
+ {
+ if (_log.isDebugEnabled())
+ {
+ _log.debug(id() + " Message(" + System.identityHashCode(msg) +
+ ") has been taken so disregarding deliver request to Subscriber:" +
+ System.identityHashCode(s));
+ }
+ }
}
- //Deliver the message
- s.send(msg, _queue);
}
}
finally
@@ -593,9 +709,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
{
if (_log.isDebugEnabled())
{
- _log.debug("Processing Async. Queued:" + hasQueuedMessages() + "(" + getQueueMessageCount() + ")" +
- " Active:" + _subscriptions.hasActiveSubscribers() +
- " Processing:" + _processing.get());
+ _log.debug("Processing Async." + currentStatus());
}
if (hasQueuedMessages() && _subscriptions.hasActiveSubscribers())
@@ -608,4 +722,16 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
}
}
+ private String currentStatus()
+ {
+ return " Queued:" + (_messages.isEmpty() ? "Empty " : "Contains") +
+ "(" + _messages.size() + ":" + ((ConcurrentLinkedQueue) _messages).size() + ") " +
+ " Extra: " + (_hasContent.isEmpty() ? "Empty " : "Contains") +
+ "(" + _hasContent.size() + ":" + _extraMessages.get() + ") " +
+ " Active:" + _subscriptions.hasActiveSubscribers() +
+ " Processing:" + _processing.get() +
+ " Queued:" + (_messages.isEmpty() ? "Empty " : "Contains") +
+ "(" + _messages.size() + ":" + ((ConcurrentLinkedQueue) _messages).size() + ") ";
+ }
+
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java
index 27abca012b..5b77951dfd 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java
@@ -32,8 +32,8 @@ import org.apache.qpid.server.store.StoreContext;
interface DeliveryManager
{
/**
- * Determines whether there are queued messages. Sets _queueing to false if
- * there are no queued messages. This needs to be atomic.
+ * Determines whether there are queued messages. Sets _queueing to false if there are no queued messages. This needs
+ * to be atomic.
*
* @return true if there are queued messages
*/
@@ -43,34 +43,34 @@ interface DeliveryManager
* This method should not be used to determin if there are messages in the queue.
*
* @return int The number of messages in the queue
+ *
* @use hasQueuedMessages() for all controls relating to having messages on the queue.
*/
int getQueueMessageCount();
/**
- * Requests that the delivery manager start processing the queue asynchronously
- * if there is work that can be done (i.e. there are messages queued up and
- * subscribers that can receive them.
- * <p/>
- * This should be called when subscribers are added, but only after the consume-ok
- * message has been returned as message delivery may start immediately. It should also
- * be called after unsuspending a client.
- * <p/>
+ * Requests that the delivery manager start processing the queue asynchronously if there is work that can be done
+ * (i.e. there are messages queued up and subscribers that can receive them. <p/> This should be called when
+ * subscribers are added, but only after the consume-ok message has been returned as message delivery may start
+ * immediately. It should also be called after unsuspending a client. <p/>
*
* @param executor the executor on which the delivery should take place
*/
void processAsync(Executor executor);
/**
- * Handles message delivery. The delivery manager is always in one of two modes;
- * it is either queueing messages for asynchronous delivery or delivering
- * directly.
+ * Handles message delivery. The delivery manager is always in one of two modes; it is either queueing messages for
+ * asynchronous delivery or delivering directly.
+ *
+ * @param storeContext
+ * @param name the name of the entity on whose behalf we are delivering the message
+ * @param msg the message to deliver
+ * @param deliverFirst
*
- * @param name the name of the entity on whose behalf we are delivering the message
- * @param msg the message to deliver
- * @throws org.apache.qpid.server.queue.FailedDequeueException if the message could not be dequeued
+ * @throws org.apache.qpid.server.queue.FailedDequeueException
+ * if the message could not be dequeued
*/
- void deliver(StoreContext storeContext, AMQShortString name, AMQMessage msg) throws FailedDequeueException, AMQException;
+ void deliver(StoreContext storeContext, AMQShortString name, AMQMessage msg, boolean deliverFirst) throws FailedDequeueException, AMQException;
void removeAMessageFromTop(StoreContext storeContext) throws AMQException;
@@ -93,4 +93,6 @@ interface DeliveryManager
long getTotalMessageSize();
long getOldestMessageArrival();
+
+ void subscriberHasPendingResend(boolean hasContent, Subscription subscription, AMQMessage msg);
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java b/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java
index fa70c6dbac..e9f209839a 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java
@@ -38,13 +38,23 @@ public interface Subscription
Queue<AMQMessage> getPreDeliveryQueue();
- void enqueueForPreDelivery(AMQMessage msg);
+ Queue<AMQMessage> getResendQueue();
+
+ Queue<AMQMessage> getNextQueue(Queue<AMQMessage> messages);
+
+ void enqueueForPreDelivery(AMQMessage msg, boolean deliverFirst);
boolean isAutoClose();
void close();
+ boolean isClosed();
+
boolean isBrowser();
boolean wouldSuspend(AMQMessage msg);
+
+ void addToResendQueue(AMQMessage msg);
+
+ Object getSendLock();
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionFactory.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionFactory.java
index 6902788fc8..917f7c4e97 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionFactory.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionFactory.java
@@ -26,16 +26,16 @@ import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.protocol.AMQProtocolSession;
/**
- * Allows the customisation of the creation of a subscription. This is typically done within an AMQQueue. This
- * factory primarily assists testing although in future more sophisticated subscribers may need a different
- * subscription implementation.
+ * Allows the customisation of the creation of a subscription. This is typically done within an AMQQueue. This factory
+ * primarily assists testing although in future more sophisticated subscribers may need a different subscription
+ * implementation.
*
* @see org.apache.qpid.server.queue.AMQQueue
*/
public interface SubscriptionFactory
{
Subscription createSubscription(int channel, AMQProtocolSession protocolSession, AMQShortString consumerTag, boolean acks,
- FieldTable filters, boolean noLocal) throws AMQException;
+ FieldTable filters, boolean noLocal, AMQQueue queue) throws AMQException;
Subscription createSubscription(int channel, AMQProtocolSession protocolSession, AMQShortString consumerTag)
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
index 6bdfeccc0f..ede7731a06 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
@@ -21,10 +21,10 @@
package org.apache.qpid.server.queue;
import java.util.Queue;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
-import org.apache.qpid.AMQChannelException;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.common.ClientProperties;
@@ -37,6 +37,8 @@ import org.apache.qpid.server.filter.FilterManagerFactory;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.util.ConcurrentLinkedQueueAtomicSize;
+import org.apache.qpid.util.MessageQueue;
+import org.apache.qpid.util.ConcurrentLinkedMessageQueueAtomicSize;
/**
* Encapsulation of a supscription to a queue. <p/> Ties together the protocol session of a subscriber, the consumer tag
@@ -52,9 +54,11 @@ public class SubscriptionImpl implements Subscription
public final AMQShortString consumerTag;
- private final Object sessionKey;
+ private final Object _sessionKey;
- private Queue<AMQMessage> _messages;
+ private MessageQueue<AMQMessage> _messages;
+
+ private Queue<AMQMessage> _resendQueue;
private final boolean _noLocal;
@@ -63,20 +67,27 @@ public class SubscriptionImpl implements Subscription
private FilterManager _filters;
private final boolean _isBrowser;
private final Boolean _autoClose;
- private boolean _closed = false;
+ private boolean _sentClose = false;
+
private static final String CLIENT_PROPERTIES_INSTANCE = ClientProperties.instance.toString();
+ private AMQQueue _queue;
+ private final AtomicBoolean _sendLock = new AtomicBoolean(false);
+
+
public static class Factory implements SubscriptionFactory
{
- public Subscription createSubscription(int channel, AMQProtocolSession protocolSession, AMQShortString consumerTag, boolean acks, FieldTable filters, boolean noLocal) throws AMQException
+ public Subscription createSubscription(int channel, AMQProtocolSession protocolSession,
+ AMQShortString consumerTag, boolean acks, FieldTable filters,
+ boolean noLocal, AMQQueue queue) throws AMQException
{
- return new SubscriptionImpl(channel, protocolSession, consumerTag, acks, filters, noLocal);
+ return new SubscriptionImpl(channel, protocolSession, consumerTag, acks, filters, noLocal, queue);
}
public SubscriptionImpl createSubscription(int channel, AMQProtocolSession protocolSession, AMQShortString consumerTag)
throws AMQException
{
- return new SubscriptionImpl(channel, protocolSession, consumerTag, false, null, false);
+ return new SubscriptionImpl(channel, protocolSession, consumerTag, false, null, false, null);
}
}
@@ -84,25 +95,27 @@ public class SubscriptionImpl implements Subscription
AMQShortString consumerTag, boolean acks)
throws AMQException
{
- this(channelId, protocolSession, consumerTag, acks, null, false);
+ this(channelId, protocolSession, consumerTag, acks, null, false, null);
}
public SubscriptionImpl(int channelId, AMQProtocolSession protocolSession,
- AMQShortString consumerTag, boolean acks, FieldTable filters, boolean noLocal)
+ AMQShortString consumerTag, boolean acks, FieldTable filters,
+ boolean noLocal, AMQQueue queue)
throws AMQException
{
AMQChannel channel = protocolSession.getChannel(channelId);
if (channel == null)
- {
+ {
throw new AMQException(AMQConstant.NOT_FOUND, "channel :" + channelId + " not found in protocol session");
}
this.channel = channel;
this.protocolSession = protocolSession;
this.consumerTag = consumerTag;
- sessionKey = protocolSession.getKey();
+ _sessionKey = protocolSession.getKey();
_acks = acks;
_noLocal = noLocal;
+ _queue = queue;
_filters = FilterManagerFactory.createManager(filters);
@@ -145,9 +158,7 @@ public class SubscriptionImpl implements Subscription
if (_filters != null)
{
- _messages = new ConcurrentLinkedQueueAtomicSize<AMQMessage>();
-
-
+ _messages = new ConcurrentLinkedMessageQueueAtomicSize<AMQMessage>();
}
else
{
@@ -169,30 +180,47 @@ public class SubscriptionImpl implements Subscription
return (o instanceof SubscriptionImpl) && equals((SubscriptionImpl) o);
}
- /** Equality holds if the session matches and the channel and consumer tag are the same. */
+ /**
+ * Equality holds if the session matches and the channel and consumer tag are the same.
+ *
+ * @param psc The subscriptionImpl to compare
+ *
+ * @return equality
+ */
private boolean equals(SubscriptionImpl psc)
{
- return sessionKey.equals(psc.sessionKey)
+ return _sessionKey.equals(psc._sessionKey)
&& psc.channel == channel
&& psc.consumerTag.equals(consumerTag);
}
public int hashCode()
{
- return sessionKey.hashCode();
+ return _sessionKey.hashCode();
}
public String toString()
{
- return "[channel=" + channel + ", consumerTag=" + consumerTag + ", session=" + protocolSession.getKey() + "]";
+ String subscriber = "[channel=" + channel +
+ ", consumerTag=" + consumerTag +
+ ", session=" + protocolSession.getKey() +
+ ", resendQueue=" + (_resendQueue != null);
+
+ if (_resendQueue != null)
+ {
+ subscriber += ", resendSize=" + _resendQueue.size();
+ }
+
+
+ return subscriber + "]";
}
/**
* This method can be called by each of the publisher threads. As a result all changes to the channel object must be
* thread safe.
*
- * @param msg
- * @param queue
+ * @param msg The message to send
+ * @param queue the Queue it has been sent from
*
* @throws AMQException
*/
@@ -278,7 +306,18 @@ public class SubscriptionImpl implements Subscription
public boolean isSuspended()
{
- return channel.isSuspended();
+ if (_logger.isTraceEnabled())
+ {
+ if (channel.isSuspended())
+ {
+ _logger.trace("Subscription(" + System.identityHashCode(this) + ") channel's is susupended");
+ }
+ if (_sendLock.get())
+ {
+ _logger.trace("Subscription(" + System.identityHashCode(this) + ") has sendLock set so closing.");
+ }
+ }
+ return channel.isSuspended() || _sendLock.get();
}
/**
@@ -376,11 +415,18 @@ public class SubscriptionImpl implements Subscription
return _messages;
}
- public void enqueueForPreDelivery(AMQMessage msg)
+ public void enqueueForPreDelivery(AMQMessage msg, boolean deliverFirst)
{
if (_messages != null)
{
- _messages.offer(msg);
+ if (deliverFirst)
+ {
+ _messages.pushHead(msg);
+ }
+ else
+ {
+ _messages.offer(msg);
+ }
}
}
@@ -391,19 +437,95 @@ public class SubscriptionImpl implements Subscription
public void close()
{
- if (!_closed)
+ synchronized (_sendLock)
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Setting SendLock true");
+ }
+
+ _sendLock.set(true);
+
+ }
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info("Closing subscription (" + System.identityHashCode(this) + "):" + this);
+ }
+
+ if (_resendQueue != null && !_resendQueue.isEmpty())
+ {
+ requeue();
+ }
+
+ //remove references in PDQ
+ if (_messages != null)
+ {
+ _messages.clear();
+ }
+
+ if (_autoClose && !_sentClose)
{
_logger.info("Closing autoclose subscription:" + this);
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
protocolSession.writeFrame(BasicCancelOkBody.createAMQFrame(channel.getChannelId(),
- protocolSession.getProtocolMajorVersion(),
- protocolSession.getProtocolMinorVersion(),
+ (byte) 8, (byte) 0, // AMQP version (major, minor)
consumerTag // consumerTag
));
- _closed = true;
+ _sentClose = true;
+ }
+ }
+
+ private void requeue()
+ {
+ if (_queue != null)
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Requeuing :" + _resendQueue.size() + " messages");
+ }
+
+ while (!_resendQueue.isEmpty())
+ {
+ AMQMessage resent = _resendQueue.poll();
+
+ resent.release();
+ _queue.subscriberHasPendingResend(false, this, resent);
+
+ try
+ {
+ channel.getTransactionalContext().deliver(resent, _queue, true);
+ }
+ catch (AMQException e)
+ {
+ _logger.error("Unable to re-deliver messages", e);
+ }
+ }
+
+ if (!_resendQueue.isEmpty())
+ {
+ _logger.error("[MESSAGES LOST]Unable to re-deliver messages as queue is null.");
+ }
+
+ _queue.subscriberHasPendingResend(false, this, null);
}
+ else
+ {
+ if (!_resendQueue.isEmpty())
+ {
+ _logger.error("Unable to re-deliver messages as queue is null.");
+ }
+ }
+
+ // Clear the messages
+ _resendQueue = null;
+ }
+
+
+ public boolean isClosed()
+ {
+ return _sendLock.get(); // This rather than _close is used to signify the subscriber is now closed.
}
public boolean isBrowser()
@@ -416,5 +538,61 @@ public class SubscriptionImpl implements Subscription
return channel.wouldSuspend(msg);
}
+ public Queue<AMQMessage> getResendQueue()
+ {
+ if (_resendQueue == null)
+ {
+ _resendQueue = new ConcurrentLinkedQueueAtomicSize<AMQMessage>();
+ }
+ return _resendQueue;
+ }
+
+
+ public Queue<AMQMessage> getNextQueue(Queue<AMQMessage> messages)
+ {
+ if (_resendQueue != null && !_resendQueue.isEmpty())
+ {
+ return _resendQueue;
+ }
+
+ if (_filters != null)
+ {
+ if (isAutoClose())
+ {
+ if (_messages.isEmpty())
+ {
+ close();
+ return null;
+ }
+ }
+ return _messages;
+ }
+ else // we want the DM queue
+ {
+ return messages;
+ }
+ }
+
+ public void addToResendQueue(AMQMessage msg)
+ {
+ // add to our resend queue
+ getResendQueue().add(msg);
+
+ // Mark Queue has having content.
+ if (_queue == null)
+ {
+ _logger.error("Queue is null won't be able to resend messages");
+ }
+ else
+ {
+ _queue.subscriberHasPendingResend(true, this, msg);
+ }
+ }
+
+ public Object getSendLock()
+ {
+ return _sendLock;
+ }
+
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java
index 871f063725..26b040aae0 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java
@@ -27,27 +27,20 @@ import java.util.concurrent.CopyOnWriteArrayList;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
-/**
- * Holds a set of subscriptions for a queue and manages the round
- * robin-ing of deliver etc.
- */
+/** Holds a set of subscriptions for a queue and manages the round robin-ing of deliver etc. */
class SubscriptionSet implements WeightedSubscriptionManager
{
private static final Logger _log = Logger.getLogger(SubscriptionSet.class);
- /**
- * List of registered subscribers
- */
+ /** List of registered subscribers */
private List<Subscription> _subscriptions = new CopyOnWriteArrayList<Subscription>();
- /**
- * Used to control the round robin delivery of content
- */
+ /** Used to control the round robin delivery of content */
private int _currentSubscriber;
+ private final Object _subscriptionsChange = new Object();
- /**
- * Accessor for unit tests.
- */
+
+ /** Accessor for unit tests. */
int getCurrentSubscriber()
{
return _currentSubscriber;
@@ -55,21 +48,43 @@ class SubscriptionSet implements WeightedSubscriptionManager
public void addSubscriber(Subscription subscription)
{
- _subscriptions.add(subscription);
+ synchronized (_subscriptionsChange)
+ {
+ _subscriptions.add(subscription);
+ }
}
/**
* Remove the subscription, returning it if it was found
*
* @param subscription
+ *
* @return null if no match was found
*/
public Subscription removeSubscriber(Subscription subscription)
{
- boolean isRemoved = _subscriptions.remove(subscription); // TODO: possibly need O(1) operation here.
- if (isRemoved)
+ // TODO: possibly need O(1) operation here.
+
+ Subscription sub = null;
+ synchronized (_subscriptionsChange)
{
- return subscription;
+ int subIndex = _subscriptions.indexOf(subscription);
+
+ if (subIndex != -1)
+ {
+ //we can't just return the passed in subscription as it is a new object
+ // and doesn't contain the stored state we need.
+ //NOTE while this may be removed now anyone with an iterator will still have it in the list!!
+ sub = _subscriptions.remove(subIndex);
+ }
+ else
+ {
+ _log.error("Unable to remove from index(" + subIndex + ")subscription:" + subscription);
+ }
+ }
+ if (sub != null)
+ {
+ return sub;
}
else
{
@@ -92,14 +107,11 @@ class SubscriptionSet implements WeightedSubscriptionManager
}
/**
- * Return the next unsuspended subscription or null if not found.
- * <p/>
- * Performance note:
- * This method can scan all items twice when looking for a subscription that is not
- * suspended. The worst case occcurs when all subscriptions are suspended. However, it is does this
- * without synchronisation and subscriptions may be added and removed concurrently. Also note that because of
- * race conditions and when subscriptions are removed between calls to nextSubscriber, the
- * IndexOutOfBoundsException also causes the scan to start at the beginning.
+ * Return the next unsuspended subscription or null if not found. <p/> Performance note: This method can scan all
+ * items twice when looking for a subscription that is not suspended. The worst case occcurs when all subscriptions
+ * are suspended. However, it is does this without synchronisation and subscriptions may be added and removed
+ * concurrently. Also note that because of race conditions and when subscriptions are removed between calls to
+ * nextSubscriber, the IndexOutOfBoundsException also causes the scan to start at the beginning.
*/
public Subscription nextSubscriber(AMQMessage msg)
{
@@ -156,9 +168,7 @@ class SubscriptionSet implements WeightedSubscriptionManager
return null;
}
- /**
- * Overridden in test classes.
- */
+ /** Overridden in test classes. */
protected void subscriberScanned()
{
}
@@ -199,8 +209,8 @@ class SubscriptionSet implements WeightedSubscriptionManager
}
/**
- * Notification that a queue has been deleted. This is called so that the subscription can inform the
- * channel, which in turn can update its list of unacknowledged messages.
+ * Notification that a queue has been deleted. This is called so that the subscription can inform the channel, which
+ * in turn can update its list of unacknowledged messages.
*
* @param queue
*/
diff --git a/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java b/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java
index 29efdd9513..d12f5cd084 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java
@@ -55,6 +55,7 @@ import org.apache.qpid.framing.QueuePurgeBody;
import org.apache.qpid.framing.TxCommitBody;
import org.apache.qpid.framing.TxRollbackBody;
import org.apache.qpid.framing.TxSelectBody;
+import org.apache.qpid.framing.BasicRejectBody;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.protocol.AMQMethodListener;
import org.apache.qpid.server.handler.BasicAckMethodHandler;
@@ -82,8 +83,9 @@ import org.apache.qpid.server.handler.QueueDeclareHandler;
import org.apache.qpid.server.handler.QueueDeleteHandler;
import org.apache.qpid.server.handler.QueuePurgeHandler;
import org.apache.qpid.server.handler.TxCommitHandler;
-import org.apache.qpid.server.handler.TxRollbackHandler;
+import org.apache.qpid.server.handler.BasicRejectMethodHandler;
import org.apache.qpid.server.handler.TxSelectHandler;
+import org.apache.qpid.server.handler.TxRollbackHandler;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
@@ -173,6 +175,7 @@ public class AMQStateManager implements AMQMethodListener
frame2handlerMap.put(TxSelectBody.class, TxSelectHandler.getInstance());
frame2handlerMap.put(TxCommitBody.class, TxCommitHandler.getInstance());
frame2handlerMap.put(TxRollbackBody.class, TxRollbackHandler.getInstance());
+ frame2handlerMap.put(BasicRejectBody.class, BasicRejectMethodHandler.getInstance());
_state2HandlersMap.put(AMQState.CONNECTION_OPEN, frame2handlerMap);
diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/DeliverMessageOperation.java b/java/broker/src/main/java/org/apache/qpid/server/txn/DeliverMessageOperation.java
deleted file mode 100644
index 4dff514ff4..0000000000
--- a/java/broker/src/main/java/org/apache/qpid/server/txn/DeliverMessageOperation.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.txn;
-
-import org.apache.log4j.Logger;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.server.queue.AMQMessage;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.store.StoreContext;
-
-/**
- * @author Robert Greig (robert.j.greig@jpmorgan.com)
- */
-public class DeliverMessageOperation implements TxnOp
-{
- private static final Logger _logger = Logger.getLogger(DeliverMessageOperation.class);
-
- private final AMQMessage _msg;
-
- private final AMQQueue _queue;
-
- public DeliverMessageOperation(AMQMessage msg, AMQQueue queue)
- {
- _msg = msg;
- _queue = queue;
- _msg.incrementReference();
- }
-
- public void prepare(StoreContext context) throws AMQException
- {
- }
-
- public void undoPrepare()
- {
- }
-
- public void commit(StoreContext context) throws AMQException
- {
- //do the memeory part of the record()
- _msg.incrementReference();
- //then process the message
- try
- {
- _queue.process(context, _msg);
- }
- catch (AMQException e)
- {
- //TODO: is there anything else we can do here? I think not...
- _logger.error("Error during commit of a queue delivery: " + e, e);
- }
- }
-
- public void rollback(StoreContext storeContext)
- {
- }
-}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java b/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java
index 5c915b5c84..e5cce672f6 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java
@@ -31,9 +31,7 @@ import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreContext;
-/**
- * A transactional context that only supports local transactions.
- */
+/** A transactional context that only supports local transactions. */
public class LocalTransactionalContext implements TransactionalContext
{
private static final Logger _log = Logger.getLogger(LocalTransactionalContext.class);
@@ -62,12 +60,14 @@ public class LocalTransactionalContext implements TransactionalContext
{
public AMQMessage message;
public AMQQueue queue;
+ private boolean deliverFirst;
- public DeliveryDetails(AMQMessage message, AMQQueue queue)
+ public DeliveryDetails(AMQMessage message, AMQQueue queue, boolean deliverFirst)
{
this.message = message;
this.queue = queue;
+ this.deliverFirst = deliverFirst;
}
}
@@ -89,9 +89,10 @@ public class LocalTransactionalContext implements TransactionalContext
public void rollback() throws AMQException
{
_txnBuffer.rollback(_storeContext);
+ _postCommitDeliveryList.clear();
}
- public void deliver(AMQMessage message, AMQQueue queue) throws AMQException
+ public void deliver(AMQMessage message, AMQQueue queue, boolean deliverFirst) throws AMQException
{
// A publication will result in the enlisting of several
// TxnOps. The first is an op that will store the message.
@@ -100,7 +101,7 @@ public class LocalTransactionalContext implements TransactionalContext
// enqueued. Finally a cleanup op will be added to decrement
// the reference associated with the routing.
message.incrementReference();
- _postCommitDeliveryList.add(new DeliveryDetails(message, queue));
+ _postCommitDeliveryList.add(new DeliveryDetails(message, queue, deliverFirst));
_messageDelivered = true;
/*_txnBuffer.enlist(new DeliverMessageOperation(message, queue));
if (_log.isDebugEnabled())
@@ -225,7 +226,7 @@ public class LocalTransactionalContext implements TransactionalContext
{
for (DeliveryDetails dd : _postCommitDeliveryList)
{
- dd.queue.process(_storeContext, dd.message);
+ dd.queue.process(_storeContext, dd.message, dd.deliverFirst);
}
}
finally
diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java b/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
index c7f3a0f0f1..19146da22e 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
@@ -34,21 +34,15 @@ import org.apache.qpid.server.queue.NoConsumersException;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreContext;
-/**
- * @author Apache Software Foundation
- */
+/** @author Apache Software Foundation */
public class NonTransactionalContext implements TransactionalContext
{
private static final Logger _log = Logger.getLogger(NonTransactionalContext.class);
- /**
- * Channel is useful for logging
- */
+ /** Channel is useful for logging */
private final AMQChannel _channel;
- /**
- * Where to put undeliverable messages
- */
+ /** Where to put undeliverable messages */
private final List<RequiredDeliveryException> _returnMessages;
private Set<Long> _browsedAcks;
@@ -57,9 +51,7 @@ public class NonTransactionalContext implements TransactionalContext
private StoreContext _storeContext;
- /**
- * Whether we are in a transaction
- */
+ /** Whether we are in a transaction */
private boolean _inTran;
public NonTransactionalContext(MessageStore messageStore, StoreContext storeContext, AMQChannel channel,
@@ -97,12 +89,12 @@ public class NonTransactionalContext implements TransactionalContext
// Does not apply to this context
}
- public void deliver(AMQMessage message, AMQQueue queue) throws AMQException
+ public void deliver(AMQMessage message, AMQQueue queue, boolean deliverFirst) throws AMQException
{
try
{
message.incrementReference();
- queue.process(_storeContext, message);
+ queue.process(_storeContext, message, deliverFirst);
//following check implements the functionality
//required by the 'immediate' flag:
message.checkDeliveredToConsumer();
diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionalContext.java b/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionalContext.java
index 59d9117fda..88451e2fca 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionalContext.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionalContext.java
@@ -38,7 +38,7 @@ public interface TransactionalContext
void rollback() throws AMQException;
- void deliver(AMQMessage message, AMQQueue queue) throws AMQException;
+ void deliver(AMQMessage message, AMQQueue queue, boolean deliverFirst) throws AMQException;
void acknowledgeMessage(long deliveryTag, long lastDeliveryTag, boolean multiple,
UnacknowledgedMessageMap unacknowledgedMessageMap) throws AMQException;