summaryrefslogtreecommitdiff
path: root/qpid/java/broker/src
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker/src')
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java281
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java15
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java20
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicAckMethodHandler.java5
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java74
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java4
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java49
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java40
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java173
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java121
13 files changed, 538 insertions, 250 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index be2cee79ee..5dd6619cff 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -99,7 +99,7 @@ public class AMQChannel
private final MessageRouter _exchanges;
- private TransactionalContext _txnContext;
+ private TransactionalContext _txnContext, _nonTransactedContext;
/**
* A context used by the message store enabling it to track context for a given channel even across thread
@@ -113,9 +113,9 @@ public class AMQChannel
private Set<Long> _browsedAcks = new HashSet<Long>();
+ //Why do we need this reference ? - ritchiem
private final AMQProtocolSession _session;
-
public AMQChannel(AMQProtocolSession session, int channelId, MessageStore messageStore, MessageRouter exchanges)
throws AMQException
{
@@ -210,9 +210,9 @@ public class AMQChannel
}
else
{
- if (_log.isDebugEnabled())
+ if (_log.isTraceEnabled())
{
- _log.debug("Content header received on channel " + _channelId);
+ _log.trace(debugIdentity() + "Content header received on channel " + _channelId);
}
_currentMessage.setContentHeaderBody(contentHeaderBody);
routeCurrentMessage();
@@ -234,9 +234,9 @@ public class AMQChannel
throw new AMQException("Received content body without previously receiving a JmsPublishBody");
}
- if (_log.isDebugEnabled())
+ if (_log.isTraceEnabled())
{
- _log.debug("Content body received on channel " + _channelId);
+ _log.trace(debugIdentity() + "Content body received on channel " + _channelId);
}
try
{
@@ -289,8 +289,10 @@ public class AMQChannel
* @param tag the tag chosen by the client (if null, server will generate one)
* @param queue the queue to subscribe to
* @param session the protocol session of the subscriber
- * @param noLocal
- * @param exclusive
+ * @param noLocal Flag stopping own messages being receivied.
+ * @param exclusive Flag requesting exclusive access to the queue
+ * @param acks Are acks enabled for this subscriber
+ * @param filters Filters to apply to this subscriber
*
* @return the consumer tag. This is returned to the subscriber and used in subsequent unsubscribe requests
*
@@ -327,6 +329,8 @@ public class AMQChannel
/**
* Called from the protocol session to close this channel and clean up. T
*
+ * @param session The session to close
+ *
* @throws AMQException if there is an error during closure
*/
public void close(AMQProtocolSession session) throws AMQException
@@ -352,10 +356,23 @@ public class AMQChannel
* @param message the message that was delivered
* @param deliveryTag the delivery tag used when delivering the message (see protocol spec for description of the
* delivery tag)
+ * @param consumerTag The tag for the consumer that is to acknowledge this message.
* @param queue the queue from which the message was delivered
*/
public void addUnacknowledgedMessage(AMQMessage message, long deliveryTag, AMQShortString consumerTag, AMQQueue queue)
{
+ if (_log.isDebugEnabled())
+ {
+ if (queue == null)
+ {
+ _log.debug("Adding unacked message with a null queue:" + message.debugIdentity());
+ }
+ else
+ {
+ _log.debug(debugIdentity() + " Adding unacked message(" + deliveryTag + ") with a queue(" + queue + "):" + message.debugIdentity());
+ }
+ }
+
synchronized (_unacknowledgedMessageMap.getLock())
{
_unacknowledgedMessageMap.add(deliveryTag, new UnacknowledgedMessage(queue, message, consumerTag, deliveryTag));
@@ -363,8 +380,15 @@ public class AMQChannel
}
}
+ private final String id = "(" + System.identityHashCode(this) + ")";
+
+ public String debugIdentity()
+ {
+ return _channelId + id;
+ }
+
/**
- * Called to attempt re-enqueue all outstanding unacknowledged messages on the channel. May result in delivery to
+ * Called to attempt re-delivery all outstanding unacknowledged messages on the channel. May result in delivery to
* this same channel or to other subscribers.
*
* @throws org.apache.qpid.AMQException if the requeue fails
@@ -374,11 +398,22 @@ public class AMQChannel
// we must create a new map since all the messages will get a new delivery tag when they are redelivered
Collection<UnacknowledgedMessage> messagesToBeDelivered = _unacknowledgedMessageMap.cancelAllMessages();
- TransactionalContext nontransacted = null;
+ // Deliver these messages out of the transaction as their delivery was never
+ // part of the transaction only the receive.
+ TransactionalContext deliveryContext;
if (!(_txnContext instanceof NonTransactionalContext))
{
- nontransacted = new NonTransactionalContext(_messageStore, _storeContext, this,
- _returnMessages, _browsedAcks);
+ if (_nonTransactedContext == null)
+ {
+ _nonTransactedContext = new NonTransactionalContext(_messageStore, _storeContext, this,
+ _returnMessages, _browsedAcks);
+ }
+
+ deliveryContext = _nonTransactedContext;
+ }
+ else
+ {
+ deliveryContext = _txnContext;
}
@@ -386,72 +421,130 @@ public class AMQChannel
{
if (unacked.queue != null)
{
- // Deliver these messages out of the transaction as their delivery was never
- // part of the transaction only the receive.
- if (!(_txnContext instanceof NonTransactionalContext))
- {
- nontransacted.deliver(unacked.message, unacked.queue, false);
- }
- else
- {
- _txnContext.deliver(unacked.message, unacked.queue, false);
- }
+ unacked.message.setRedelivered(true);
+
+ // Deliver Message
+ deliveryContext.deliver(unacked.message, unacked.queue, false);
+
+ // Should we allow access To the DM to directy deliver the message?
+ // As we don't need to check for Consumers or worry about incrementing the message count?
+// unacked.queue.getDeliveryManager().deliver(_storeContext, unacked.queue.getName(), unacked.message, false);
}
}
}
+ /**
+ * Requeue a single message
+ *
+ * @param deliveryTag The message to requeue
+ *
+ * @throws AMQException If something goes wrong.
+ */
public void requeue(long deliveryTag) throws AMQException
{
UnacknowledgedMessage unacked = _unacknowledgedMessageMap.remove(deliveryTag);
if (unacked != null)
{
- TransactionalContext nontransacted = null;
+
+ // Ensure message is released for redelivery
+ unacked.message.release();
+
+ // Mark message redelivered
+ unacked.message.setRedelivered(true);
+
+ // Deliver these messages out of the transaction as their delivery was never
+ // part of the transaction only the receive.
+ TransactionalContext deliveryContext;
if (!(_txnContext instanceof NonTransactionalContext))
{
- nontransacted = new NonTransactionalContext(_messageStore, _storeContext, this,
- _returnMessages, _browsedAcks);
+ if (_nonTransactedContext == null)
+ {
+ _nonTransactedContext = new NonTransactionalContext(_messageStore, _storeContext, this,
+ _returnMessages, _browsedAcks);
+ }
+
+ deliveryContext = _nonTransactedContext;
+ }
+ else
+ {
+ deliveryContext = _txnContext;
}
- if (!(_txnContext instanceof NonTransactionalContext))
+
+ if (unacked.queue != null)
{
- nontransacted.deliver(unacked.message, unacked.queue, false);
+ //Redeliver the messages to the front of the queue
+ deliveryContext.deliver(unacked.message, unacked.queue, true);
+
+ unacked.message.decrementReference(_storeContext);
}
else
{
- _txnContext.deliver(unacked.message, unacked.queue, false);
+ _log.warn(System.identityHashCode(this) + " Requested requeue of message(" + unacked.message.debugIdentity() + "):" + deliveryTag +
+ " but no queue defined and no DeadLetter queue so DROPPING message.");
+// _log.error("Requested requeue of message:" + deliveryTag +
+// " but no queue defined using DeadLetter queue:" + getDeadLetterQueue());
+//
+// deliveryContext.deliver(unacked.message, getDeadLetterQueue(), false);
+//
+// unacked.message.decrementReference(_storeContext);
}
- unacked.message.decrementReference(_storeContext);
}
else
{
- _log.error("Requested requeue of message:" + deliveryTag + " but no such delivery tag exists");
+ _log.warn("Requested requeue of message:" + deliveryTag + " but no such delivery tag exists." + _unacknowledgedMessageMap.size());
+
+ if (_log.isDebugEnabled())
+ {
+ _unacknowledgedMessageMap.visit(new UnacknowledgedMessageMap.Visitor()
+ {
+ int count = 0;
+
+ public boolean callback(UnacknowledgedMessage message) throws AMQException
+ {
+ _log.debug((count++) + ": (" + message.message.debugIdentity() + ")" +
+ "[" + message.deliveryTag + "]");
+ return false; // Continue
+ }
+
+ public void visitComplete()
+ {
+
+ }
+ });
+ }
}
}
- /** Called to resend all outstanding unacknowledged messages to this same channel.
- * @param session the session
- * @param requeue if true then requeue, else resend
- * @throws org.apache.qpid.AMQException */
- public void resend(final AMQProtocolSession session, final boolean requeue) throws AMQException
+ /**
+ * Called to resend all outstanding unacknowledged messages to this same channel.
+ *
+ * @param requeue Are the messages to be requeued or dropped.
+ *
+ * @throws AMQException When something goes wrong.
+ */
+ public void resend(final boolean requeue) throws AMQException
{
final List<UnacknowledgedMessage> msgToRequeue = new LinkedList<UnacknowledgedMessage>();
final List<UnacknowledgedMessage> msgToResend = new LinkedList<UnacknowledgedMessage>();
if (_log.isInfoEnabled())
{
- _log.info("unacked map contains " + _unacknowledgedMessageMap.size());
+ _log.info("unacked map Size:" + _unacknowledgedMessageMap.size());
}
+ // Process the Unacked-Map.
+ // Marking messages who still have a consumer for to be resent
+ // and those that don't to be requeued.
_unacknowledgedMessageMap.visit(new UnacknowledgedMessageMap.Visitor()
{
public boolean callback(UnacknowledgedMessage message) throws AMQException
{
- long deliveryTag = message.deliveryTag;
AMQShortString consumerTag = message.consumerTag;
AMQMessage msg = message.message;
msg.setRedelivered(true);
@@ -503,14 +596,21 @@ public class AMQChannel
{
if (!msgToResend.isEmpty())
{
- _log.info("Preparing (" + msgToResend.size() + ") message to resend to.");
+ _log.info("Preparing (" + msgToResend.size() + ") message to resend.");
+ }
+ else
+ {
+ _log.info("No message to resend.");
}
}
for (UnacknowledgedMessage message : msgToResend)
{
AMQMessage msg = message.message;
- // Our Java Client will always suspend the channel when resending!!
+ // Our Java Client will always suspend the channel when resending!
+ // If the client has requested the messages be resent then it is
+ // their responsibility to ensure that thay are capable of receiving them
+ // i.e. The channel hasn't been server side suspended.
// if (isSuspended())
// {
// _log.info("Channel is suspended so requeuing");
@@ -518,50 +618,58 @@ public class AMQChannel
// msgToRequeue.add(message);
// }
// else
- {
- //release to allow it to be delivered
- msg.release();
+// {
+ //release to allow it to be delivered
+ msg.release();
- // Without any details from the client about what has been processed we have to mark
- // all messages in the unacked map as redelivered.
- msg.setRedelivered(true);
+ // Without any details from the client about what has been processed we have to mark
+ // all messages in the unacked map as redelivered.
+ msg.setRedelivered(true);
- Subscription sub = msg.getDeliveredSubscription();
+ Subscription sub = msg.getDeliveredSubscription();
- if (sub != null)
+ if (sub != null)
+ {
+ // Get the lock so we can tell if the sub scription has closed.
+ // will stop delivery to this subscription until the lock is released.
+ // note: this approach would allow the use of a single queue if the
+ // PreDeliveryQueue would allow head additions.
+ // In the Java Qpid client we are suspended whilst doing this so it is all rather Mute..
+ // needs guidance from AMQP WG Model SIG
+ synchronized (sub.getSendLock())
{
- synchronized (sub.getSendLock())
+ if (sub.isClosed())
{
- if (sub.isClosed())
+ if (_log.isDebugEnabled())
{
- _log.info("Subscription closed during resend so requeuing message");
- //move this message to requeue
- msgToRequeue.add(message);
+ _log.debug("Subscription(" + System.identityHashCode(sub) + ") closed during resend so requeuing message");
}
- else
+ //move this message to requeue
+ msgToRequeue.add(message);
+ }
+ else
+ {
+ if (_log.isDebugEnabled())
{
- if (_log.isDebugEnabled())
- {
- _log.debug("Requeuing (" + System.identityHashCode(msg) + ") for resend");
- }
- // Will throw an exception if the sub is closed
- sub.addToResendQueue(msg);
- _unacknowledgedMessageMap.remove(message.deliveryTag);
- // Don't decrement as we are bypassing the normal deliver which increments
- // this is what there is a decrement on the Requeue as deliver will increment.
- // msg.decrementReference(_storeContext);
+ _log.debug("Requeuing " + msg.debugIdentity() + " for resend via sub:" + System.identityHashCode(sub));
}
+ sub.addToResendQueue(msg);
+ _unacknowledgedMessageMap.remove(message.deliveryTag);
+ // Don't decrement as we are bypassing the normal deliver which increments
+ // this is why there is a decrement on the Requeue as deliver will increment.
+ // msg.decrementReference(_storeContext);
}
- }
- else
- {
- _log.info("DeliveredSubscription not recorded so just requeueing to prevent loss");
- //move this message to requeue
- msgToRequeue.add(message);
- }
+ } // sync(sub.getSendLock)
}
- }
+ else
+ {
+ _log.info("DeliveredSubscription not recorded so just requeueing to prevent loss");
+ //move this message to requeue
+ msgToRequeue.add(message);
+ }
+ } // for all messages
+// } else !isSuspend
if (_log.isInfoEnabled())
{
@@ -571,26 +679,31 @@ public class AMQChannel
}
}
- TransactionalContext nontransacted = null;
+ // Deliver these messages out of the transaction as their delivery was never
+ // part of the transaction only the receive.
+ TransactionalContext deliveryContext;
if (!(_txnContext instanceof NonTransactionalContext))
{
- nontransacted = new NonTransactionalContext(_messageStore, _storeContext, this,
- _returnMessages, _browsedAcks);
+ if (_nonTransactedContext == null)
+ {
+ _nonTransactedContext = new NonTransactionalContext(_messageStore, _storeContext, this,
+ _returnMessages, _browsedAcks);
+ }
+
+ deliveryContext = _nonTransactedContext;
+ }
+ else
+ {
+ deliveryContext = _txnContext;
}
// Process Messages to Requeue at the front of the queue
for (UnacknowledgedMessage message : msgToRequeue)
{
- // Deliver these messages out of the transaction as their delivery was never
- // part of the transaction only the receive.
- if (!(_txnContext instanceof NonTransactionalContext))
- {
- nontransacted.deliver(message.message, message.queue, true);
- }
- else
- {
- _txnContext.deliver(message.message, message.queue, true);
- }
+ message.message.release();
+ message.message.setRedelivered(true);
+
+ deliveryContext.deliver(message.message, message.queue, true);
_unacknowledgedMessageMap.remove(message.deliveryTag);
message.message.decrementReference(_storeContext);
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java
index 3f2348b71b..940b5b2bf1 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java
@@ -42,6 +42,21 @@ public class UnacknowledgedMessage
message.incrementReference();
}
+ public String toString()
+ {
+ StringBuilder sb = new StringBuilder();
+ sb.append("Q:");
+ sb.append(queue);
+ sb.append(" M:");
+ sb.append(message);
+ sb.append(" CT:");
+ sb.append(consumerTag);
+ sb.append(" DT:");
+ sb.append(deliveryTag);
+
+ return sb.toString();
+ }
+
public void discard(StoreContext storeContext) throws AMQException
{
if (queue != null)
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java
index 99cc60011a..30bbdea2ef 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java
@@ -196,25 +196,7 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap
}
}
}
-
- public void resendMessages(AMQProtocolSession protocolSession, int channelId) throws AMQException
- {
- synchronized (_lock)
- {
- for (Map.Entry<Long, UnacknowledgedMessage> entry : _map.entrySet())
- {
- long deliveryTag = entry.getKey();
- AMQShortString consumerTag = entry.getValue().consumerTag;
- AMQMessage msg = entry.getValue().message;
-
- if(consumerTag != null)
- {
- protocolSession.getProtocolOutputConverter().writeDeliver(msg, channelId, deliveryTag, consumerTag);
- }
- }
- }
- }
-
+
public UnacknowledgedMessage get(long key)
{
synchronized (_lock)
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicAckMethodHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicAckMethodHandler.java
index f93b2b25e6..a6972475a6 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicAckMethodHandler.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicAckMethodHandler.java
@@ -47,12 +47,13 @@ public class BasicAckMethodHandler implements StateAwareMethodListener<BasicAckB
public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<BasicAckBody> evt) throws AMQException
{
AMQProtocolSession protocolSession = stateManager.getProtocolSession();
+ BasicAckBody body = evt.getMethod();
if (_log.isDebugEnabled())
{
- _log.debug("Ack received on channel " + evt.getChannelId());
+ _log.debug("Ack(Tag:" + body.deliveryTag + ":Mult:" + body.multiple + ") received on channel " + evt.getChannelId());
}
- BasicAckBody body = evt.getMethod();
+
final AMQChannel channel = protocolSession.getChannel(evt.getChannelId());
if (channel == null)
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java
index bc11e4652c..a436c35473 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java
@@ -54,7 +54,7 @@ public class BasicRecoverMethodHandler implements StateAwareMethodListener<Basic
throw body.getChannelNotFoundException(evt.getChannelId());
}
- channel.resend(session, body.requeue);
+ channel.resend(body.requeue);
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java
index ed13092ded..4e77a5e8b9 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java
@@ -23,6 +23,7 @@ package org.apache.qpid.server.handler;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.BasicRejectBody;
import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.ack.UnacknowledgedMessage;
import org.apache.qpid.server.protocol.AMQProtocolSession;
@@ -49,20 +50,67 @@ public class BasicRejectMethodHandler implements StateAwareMethodListener<BasicR
{
AMQProtocolSession session = stateManager.getProtocolSession();
- _logger.info("FIXME: Rejecting:" + evt.getMethod().deliveryTag + ": Requeue:" + evt.getMethod().requeue);
-
int channelId = evt.getChannelId();
- UnacknowledgedMessage message = session.getChannel(channelId).getUnacknowledgedMessageMap().get(evt.getMethod().deliveryTag);
-
- _logger.info("Need to reject message:" + message);
-// if (evt.getMethod().requeue)
-// {
-// session.getChannel(channelId).requeue(evt.getMethod().deliveryTag);
-// }
-// else
-// {
-// // session.getChannel(channelId).resend(message);
-// }
+ if (_logger.isTraceEnabled())
+ {
+ _logger.trace("Rejecting:" + evt.getMethod().deliveryTag +
+ ": Requeue:" + evt.getMethod().requeue +
+// ": Resend:" + evt.getMethod().resend +
+ " on channel:" + channelId);
+ }
+
+ AMQChannel channel = session.getChannel(channelId);
+
+ if (channel == null)
+ {
+ throw evt.getMethod().getChannelNotFoundException(channelId);
+ }
+
+ if (_logger.isTraceEnabled())
+ {
+ _logger.trace("Rejecting:" + evt.getMethod().deliveryTag +
+ ": Requeue:" + evt.getMethod().requeue +
+// ": Resend:" + evt.getMethod().resend +
+ " on channel:" + channel.debugIdentity());
+ }
+
+ long deliveryTag = evt.getMethod().deliveryTag;
+
+ UnacknowledgedMessage message = channel.getUnacknowledgedMessageMap().get(deliveryTag);
+
+ if (message == null)
+ {
+ _logger.warn("Dropping reject request as message is null for tag:" + deliveryTag);
+// throw evt.getMethod().getChannelException(AMQConstant.NOT_FOUND, "Delivery Tag(" + deliveryTag + ")not known");
+ }
+ else
+ {
+
+ if (_logger.isTraceEnabled())
+ {
+ _logger.trace("Rejecting: DT:" + deliveryTag + "-" + message.message.debugIdentity() +
+ ": Requeue:" + evt.getMethod().requeue +
+// ": Resend:" + evt.getMethod().resend +
+ " on channel:" + channel.debugIdentity());
+ }
+
+ // If we haven't requested message to be resent to this consumer then reject it from ever getting it.
+// if (!evt.getMethod().resend)
+ {
+ message.message.reject(message.message.getDeliveredSubscription());
+ }
+
+ if (evt.getMethod().requeue)
+ {
+ channel.requeue(deliveryTag);
+ }
+ else
+ {
+ _logger.warn("Dropping message as requeue not required and there is no dead letter queue");
+// message.queue = channel.getDefaultDeadLetterQueue();
+// channel.requeue(deliveryTag);
+ }
+ }
}
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java
index 9a8fce7129..777784ca30 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java
@@ -63,6 +63,8 @@ public class ChannelCloseHandler implements StateAwareMethodListener<ChannelClos
}
session.closeChannel(channelId);
+ // Client requested closure so we don't wait for ok we send it
+ stateManager.getProtocolSession().closeChannelOk(channelId);
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java
index a10f44f906..f747f7a840 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java
@@ -63,7 +63,7 @@ public class TxRollbackHandler implements StateAwareMethodListener<TxRollbackBod
//Now resend all the unacknowledged messages back to the original subscribers.
//(Must be done after the TxnRollback-ok response).
// Why, are we not allowed to send messages back to client before the ok method?
- channel.resend(session, false);
+ channel.resend(false);
}
catch (AMQException e)
{
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
index d71f6e3046..133f4809b4 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
@@ -523,8 +523,8 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
{
try
{
- markChannelawaitingCloseOk(channelId);
channel.close(this);
+ markChannelawaitingCloseOk(channelId);
}
finally
{
@@ -546,7 +546,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
/**
* In our current implementation this is used by the clustering code.
*
- * @param channelId
+ * @param channelId The channel to remove
*/
public void removeChannel(int channelId)
{
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
index dedea68d18..6d375c89fe 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
@@ -85,11 +85,18 @@ public class AMQMessage
private Subscription _takenBySubcription;
+ private Set<Subscription> _rejectedBy = null;
+
public boolean isTaken()
{
return _taken.get();
}
+ public String debugIdentity()
+ {
+ return "(HC:" + System.identityHashCode(this) + " ID:" + _messageId + ")";
+ }
+
/**
* Used to iterate through all the body frames associated with this message. Will not keep all the data in memory
* therefore is memory-efficient.
@@ -199,7 +206,7 @@ public class AMQMessage
_taken = new AtomicBoolean(false);
if (_log.isDebugEnabled())
{
- _log.debug("Message created with id " + messageId);
+ _log.debug("Message(" + System.identityHashCode(this) + ") created with id " + messageId);
}
}
@@ -452,7 +459,9 @@ public class AMQMessage
public void release()
{
+ _log.trace("Releasing Message:" + debugIdentity());
_taken.set(false);
+ _takenBySubcription = null;
}
public boolean checkToken(Object token)
@@ -511,7 +520,7 @@ public class AMQMessage
* @throws NoConsumersException if the message is marked for immediate delivery but has not been marked as delivered
* to a consumer
*/
- public void checkDeliveredToConsumer() throws NoConsumersException, AMQException
+ public void checkDeliveredToConsumer() throws NoConsumersException
{
if (_immediate && !_deliveredToConsumer)
@@ -580,7 +589,8 @@ public class AMQMessage
for (AMQQueue q : destinationQueues)
{
- _txnContext.deliver(this, q, true);
+ //normal deliver so add this message at the end.
+ _txnContext.deliver(this, q, false);
}
}
finally
@@ -801,7 +811,7 @@ public class AMQMessage
public String toString()
{
- return "Message: " + _messageId + "; ref count: " + _referenceCount + "; taken: " +
+ return "Message[" + debugIdentity() + "]: " + _messageId + "; ref count: " + _referenceCount + "; taken: " +
_taken + " by:" + _takenBySubcription;
}
@@ -809,4 +819,35 @@ public class AMQMessage
{
return _takenBySubcription;
}
+
+ public void reject(Subscription subscription)
+ {
+ if (subscription != null)
+ {
+ if (_rejectedBy == null)
+ {
+ _rejectedBy = new HashSet<Subscription>();
+ }
+
+ _rejectedBy.add(subscription);
+ }
+ else
+ {
+ _log.warn("Requesting rejection by null subscriber:" + debugIdentity());
+ }
+ }
+
+ public boolean isRejectedBy(Subscription subscription)
+ {
+ boolean rejected = _rejectedBy != null;
+
+ if (rejected) // We have subscriptions that rejected this message
+ {
+ return _rejectedBy.contains(subscription);
+ }
+ else // This messasge hasn't been rejected yet.
+ {
+ return rejected;
+ }
+ }
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
index 5bbe1671a7..7c2fe73386 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
@@ -50,6 +50,7 @@ import org.apache.qpid.server.virtualhost.VirtualHost;
*/
public class AMQQueue implements Managable, Comparable
{
+
public static final class ExistingExclusiveSubscription extends AMQException
{
@@ -446,7 +447,11 @@ public class AMQQueue implements Managable, Comparable
setExclusive(true);
}
- debug("Registering protocol session {0} with channel {1} and consumer tag {2} with {3}", ps, channel, consumerTag, this);
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug(MessageFormat.format("Registering protocol session {0} with channel {1} and " +
+ "consumer tag {2} with {3}", ps, channel, consumerTag, this));
+ }
Subscription subscription = _subscriptionFactory.createSubscription(channel, ps, consumerTag, acks,
filters, noLocal, this);
@@ -486,8 +491,11 @@ public class AMQQueue implements Managable, Comparable
public void unregisterProtocolSession(AMQProtocolSession ps, int channel, AMQShortString consumerTag) throws AMQException
{
- debug("Unregistering protocol session {0} with channel {1} and consumer tag {2} from {3}", ps, channel, consumerTag,
- this);
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug(MessageFormat.format("Unregistering protocol session {0} with channel {1} and consumer tag {2} from {3}", ps, channel, consumerTag,
+ this));
+ }
Subscription removedSubscription;
if ((removedSubscription = _subscribers.removeSubscriber(_subscriptionFactory.createSubscription(channel,
@@ -506,6 +514,10 @@ public class AMQQueue implements Managable, Comparable
// if we are eligible for auto deletion, unregister from the queue registry
if (_autoDelete && _subscribers.isEmpty())
{
+ if (_logger.isInfoEnabled())
+ {
+ _logger.warn("Auto-deleteing queue:" + this);
+ }
autodelete();
// we need to manually fire the event to the removed subscription (which was the last one left for this
// queue. This is because the delete method uses the subscription set which has just been cleared
@@ -561,14 +573,18 @@ public class AMQQueue implements Managable, Comparable
protected void autodelete() throws AMQException
{
- debug("autodeleting {0}", this);
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug(MessageFormat.format("autodeleting {0}", this));
+ }
delete();
}
- public void processGet(StoreContext storeContext, AMQMessage msg) throws AMQException
+ public void processGet(StoreContext storeContext, AMQMessage msg, boolean deliverFirst) throws AMQException
{
//fixme not sure what this is doing. should we be passing deliverFirst through here?
- _deliveryMgr.deliver(storeContext, getName(), msg, false);
+ // This code is not used so when it is perhaps it should
+ _deliveryMgr.deliver(storeContext, getName(), msg, deliverFirst);
try
{
msg.checkDeliveredToConsumer();
@@ -582,6 +598,10 @@ public class AMQQueue implements Managable, Comparable
}
}
+// public DeliveryManager getDeliveryManager()
+// {
+// return _deliveryMgr;
+// }
public void process(StoreContext storeContext, AMQMessage msg, boolean deliverFirst) throws AMQException
{
@@ -673,14 +693,6 @@ public class AMQQueue implements Managable, Comparable
return "Queue(" + _name + ")@" + System.identityHashCode(this);
}
- private void debug(String msg, Object... args)
- {
- if (_logger.isDebugEnabled())
- {
- _logger.debug(MessageFormat.format(msg, args));
- }
- }
-
public boolean performGet(AMQProtocolSession session, AMQChannel channel, boolean acks) throws AMQException
{
return _deliveryMgr.performGet(session, channel, acks);
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
index e70926736d..601effcec7 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
@@ -45,7 +45,6 @@ import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.util.MessageQueue;
import org.apache.qpid.util.ConcurrentLinkedMessageQueueAtomicSize;
-import org.apache.qpid.util.ConcurrentLinkedQueueAtomicSize;
/** Manages delivery of messages on behalf of a queue */
@@ -86,6 +85,8 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
private AtomicLong _totalMessageSize = new AtomicLong();
private AtomicInteger _extraMessages = new AtomicInteger();
private Set<Subscription> _hasContent = Collections.synchronizedSet(new HashSet<Subscription>());
+ private final Object _queueHeadLock = new Object();
+ private String _processingThreadName = "";
ConcurrentSelectorDeliveryManager(SubscriptionManager subscriptions, AMQQueue queue)
{
@@ -118,7 +119,10 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
if (deliverFirst)
{
- _messages.pushHead(msg);
+ synchronized (_queueHeadLock)
+ {
+ _messages.pushHead(msg);
+ }
}
else
{
@@ -367,16 +371,19 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
long count = 0;
_lock.lock();
- AMQMessage msg = getNextMessage();
- while (msg != null)
+ synchronized (_queueHeadLock)
{
- //mark this message as taken and get it removed
- msg.taken(null);
- _queue.dequeue(storeContext, msg);
- msg = getNextMessage();
- count++;
- }
+ AMQMessage msg = getNextMessage();
+ while (msg != null)
+ {
+ //and remove it
+ _messages.poll();
+ _queue.dequeue(storeContext, msg);
+ msg = getNextMessage();
+ count++;
+ }
+ }
_lock.unlock();
return count;
}
@@ -390,12 +397,20 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
{
AMQMessage message = messages.peek();
-
- while (message != null && ((sub == null || sub.isBrowser()) || message.taken(sub)))
+ while (message != null && ((sub != null && sub.isBrowser()) || message.taken(sub)))
{
//remove the already taken message
- messages.poll();
+ AMQMessage removed = messages.poll();
+
+ assert removed == message;
+
_totalMessageSize.addAndGet(-message.getSize());
+
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("Removed taken message:" + message.debugIdentity());
+ }
+
// try the next message
message = messages.peek();
}
@@ -409,7 +424,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
if (_log.isTraceEnabled())
{
- _log.trace("Async sendNextMessage for sub (" + System.identityHashCode(sub) +
+ _log.trace(debugIdentity() + "Async sendNextMessage for sub (" + System.identityHashCode(sub) +
") from queue (" + System.identityHashCode(messageQueue) +
") AMQQueue (" + System.identityHashCode(queue) + ")");
}
@@ -417,46 +432,63 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
if (messageQueue == null)
{
// There is no queue with messages currently. This is ok... just means the queue has no msgs matching selector
- if (_log.isDebugEnabled())
+ if (_log.isInfoEnabled())
{
- _log.debug(sub + ": asked to send messages but has none on given queue:" + queue);
+ _log.info(debugIdentity() + sub + ": asked to send messages but has none on given queue:" + queue);
}
return;
}
AMQMessage message = null;
+ AMQMessage removed = null;
try
{
- message = getNextMessage(messageQueue, sub);
-
- // message will be null if we have no messages in the messageQueue.
- if (message == null)
+ synchronized (_queueHeadLock)
{
- if (_log.isTraceEnabled())
+ message = getNextMessage(messageQueue, sub);
+
+ // message will be null if we have no messages in the messageQueue.
+ if (message == null)
{
- _log.trace("No messages for Subscriber(" + System.identityHashCode(sub) + ") from queue; (" + System.identityHashCode(messageQueue) + ")");
+ if (_log.isTraceEnabled())
+ {
+ _log.trace(debugIdentity() + "No messages for Subscriber(" + System.identityHashCode(sub) + ") from queue; (" + System.identityHashCode(messageQueue) + ")");
+ }
+ return;
+ }
+ if (_log.isDebugEnabled())
+ {
+ _log.debug(debugIdentity() + "Async Delivery Message " + message.getMessageId() + "(" + System.identityHashCode(message) +
+ ") by :" + System.identityHashCode(this) +
+ ") to :" + System.identityHashCode(sub));
}
- return;
+
+ sub.send(message, _queue);
+
+ //remove sent message from our queue.
+ removed = messageQueue.poll();
+ //If we don't remove the message from _messages
+ // Otherwise the Async send will never end
}
+
+ if (removed != message)
+ {
+ _log.error("Just send message:" + message.debugIdentity() + " BUT removed this from queue:" + removed);
+ }
+
if (_log.isDebugEnabled())
{
- _log.debug("Async Delivery Message (" + System.identityHashCode(message) +
+ _log.debug(debugIdentity() + "Async Delivered Message r:" + removed.debugIdentity() + "d:" + message.debugIdentity() +
") by :" + System.identityHashCode(this) +
") to :" + System.identityHashCode(sub));
}
- sub.send(message, _queue);
-
- //remove sent message from our queue.
- messageQueue.poll();
- //If we don't remove the message from _messages
- // Otherwise the Async send will never end
if (messageQueue == sub.getResendQueue())
{
if (_log.isTraceEnabled())
{
- _log.trace("All messages sent from resendQueue for " + sub);
+ _log.trace(debugIdentity() + "All messages sent from resendQueue for " + sub);
}
if (messageQueue.isEmpty())
{
@@ -469,7 +501,10 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
}
else if (messageQueue == sub.getPreDeliveryQueue())
{
- _log.info("We could do clean up of the main _message queue here");
+ if (_log.isInfoEnabled())
+ {
+ _log.info(debugIdentity() + "We could do clean up of the main _message queue here");
+ }
}
_totalMessageSize.addAndGet(-message.getSize());
@@ -477,7 +512,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
catch (AMQException e)
{
message.release();
- _log.error("Unable to deliver message as dequeue failed: " + e, e);
+ _log.error(debugIdentity() + "Unable to deliver message as dequeue failed: " + e, e);
}
}
@@ -516,6 +551,12 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
*/
private void processQueue()
{
+ //record thread name
+ if (_log.isDebugEnabled())
+ {
+ _processingThreadName = Thread.currentThread().getName();
+ }
+
// Continue to process delivery while we haveSubscribers and messages
boolean hasSubscribers = _subscriptions.hasActiveSubscribers();
@@ -561,9 +602,10 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
{
if (_log.isDebugEnabled())
{
- _log.debug(id() + "deliver :first(" + deliverFirst + ") :" + msg);
+ _log.debug(debugIdentity() + "deliver :first(" + deliverFirst + ") :" + msg);
}
- msg.release();
+ // This shouldn't be done here.
+// msg.release();
//Check if we have someone to deliver the message to.
_lock.lock();
@@ -575,7 +617,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
{
if (_log.isDebugEnabled())
{
- _log.debug(id() + "Testing Message(" + msg + ") for Queued Delivery:" + currentStatus());
+ _log.debug(debugIdentity() + "Testing Message(" + msg + ") for Queued Delivery:" + currentStatus());
}
if (!msg.getMessagePublishInfo().isImmediate())
{
@@ -587,7 +629,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
//Pre Deliver to all subscriptions
if (_log.isDebugEnabled())
{
- _log.debug(id() + "We have " + _subscriptions.getSubscriptions().size() +
+ _log.debug(debugIdentity() + "We have " + _subscriptions.getSubscriptions().size() +
" subscribers to give the message to:" + currentStatus());
}
for (Subscription sub : _subscriptions.getSubscriptions())
@@ -598,7 +640,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
{
if (_log.isDebugEnabled())
{
- _log.debug(id() + "Stopping PreDelivery as message(" + System.identityHashCode(msg) +
+ _log.debug(debugIdentity() + "Stopping PreDelivery as message(" + System.identityHashCode(msg) +
") is already delivered.");
}
continue;
@@ -609,7 +651,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
{
if (_log.isDebugEnabled())
{
- _log.debug(id() + "Queuing message(" + System.identityHashCode(msg) +
+ _log.debug(debugIdentity() + "Queuing message(" + System.identityHashCode(msg) +
") for PreDelivery for subscriber(" + System.identityHashCode(sub) + ")");
}
sub.enqueueForPreDelivery(msg, deliverFirst);
@@ -625,9 +667,9 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
{
if (!s.isSuspended())
{
- if (_log.isDebugEnabled())
+ if (_log.isTraceEnabled())
{
- _log.debug(id() + "Delivering Message:" + System.identityHashCode(msg) + " to(" +
+ _log.trace(debugIdentity() + "Delivering Message:" + msg.debugIdentity() + " to(" +
System.identityHashCode(s) + ") :" + s);
}
msg.taken(s);
@@ -636,33 +678,35 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
}
else
{
- if (_log.isDebugEnabled())
+ if (_log.isInfoEnabled())
{
- _log.debug(id() + " Subscription(" + System.identityHashCode(s) + ") became suspended between nextSubscriber and send");
+ _log.info(debugIdentity() + " Subscription(" + System.identityHashCode(s) + ") became " +
+ "suspended between nextSubscriber and send for message:" + msg.debugIdentity());
}
}
+ }
- if (!msg.isTaken())
+ if (!msg.isTaken())
+ {
+ if (_log.isInfoEnabled())
{
- if (_log.isDebugEnabled())
- {
- _log.debug(id() + " Message(" + System.identityHashCode(msg) + ") has not been taken so recursing!:" +
- " Subscriber:" + System.identityHashCode(s));
- }
-
- deliver(context, name, msg, deliverFirst);
+ _log.info(debugIdentity() + " Message(" + msg.debugIdentity() + ") has not been taken so recursing!:" +
+ " Subscriber:" + System.identityHashCode(s));
}
- else
+
+ deliver(context, name, msg, deliverFirst);
+ }
+ else
+ {
+ if (_log.isDebugEnabled())
{
- if (_log.isDebugEnabled())
- {
- _log.debug(id() + " Message(" + System.identityHashCode(msg) +
- ") has been taken so disregarding deliver request to Subscriber:" +
- System.identityHashCode(s));
- }
+ _log.debug(debugIdentity() + " Message(" + msg.debugIdentity() +
+ ") has been taken so disregarding deliver request to Subscriber:" +
+ System.identityHashCode(s));
}
}
}
+
}
finally
{
@@ -674,10 +718,9 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
}
}
- //fixme remove
private final String id = "(" + String.valueOf(System.identityHashCode(this)) + ")";
- private String id()
+ private String debugIdentity()
{
return id;
}
@@ -710,7 +753,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
{
if (_log.isDebugEnabled())
{
- _log.debug("Processing Async." + currentStatus());
+ _log.debug(debugIdentity() + "Processing Async." + currentStatus());
}
if (hasQueuedMessages() && _subscriptions.hasActiveSubscribers())
@@ -725,14 +768,12 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
private String currentStatus()
{
- return " Queued:" + (_messages.isEmpty() ? "Empty " : "Contains") +
- "(" + _messages.size() + ":" + ((ConcurrentLinkedQueue) _messages).size() + ") " +
+ return " Queued:" + (_messages.isEmpty() ? "Empty " : "Contains(M:H)") +
+ "(" + _messages.size() + ":" + ((ConcurrentLinkedMessageQueueAtomicSize) _messages).headSize() + ") " +
" Extra: " + (_hasContent.isEmpty() ? "Empty " : "Contains") +
"(" + _hasContent.size() + ":" + _extraMessages.get() + ") " +
" Active:" + _subscriptions.hasActiveSubscribers() +
- " Processing:" + _processing.get() +
- " Queued:" + (_messages.isEmpty() ? "Empty " : "Contains") +
- "(" + _messages.size() + ":" + ((ConcurrentLinkedQueue) _messages).size() + ") ";
+ " Processing:" + (_processing.get() ? " true : Processing Thread: " + _processingThreadName : " false");
}
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
index 0a2e73880c..20033daac7 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
@@ -46,6 +46,8 @@ import org.apache.qpid.util.ConcurrentLinkedMessageQueueAtomicSize;
*/
public class SubscriptionImpl implements Subscription
{
+
+ private static final Logger _suspensionlogger = Logger.getLogger("Suspension");
private static final Logger _logger = Logger.getLogger(SubscriptionImpl.class);
public final AMQChannel channel;
@@ -258,6 +260,12 @@ public class SubscriptionImpl implements Subscription
{
channel.addUnacknowledgedBrowsedMessage(msg, deliveryTag, consumerTag, queue);
}
+
+ if (_sendLock.get())
+ {
+ _logger.error("Sending " + msg + " when subscriber(" + this + ") is closed!");
+ }
+
protocolSession.getProtocolOutputConverter().writeDeliver(msg, channel.getChannelId(), deliveryTag, consumerTag);
}
}
@@ -265,56 +273,56 @@ public class SubscriptionImpl implements Subscription
private void sendToConsumer(StoreContext storeContext, AMQMessage msg, AMQQueue queue)
throws AMQException
{
- try
- {
- // if we do not need to wait for client acknowledgements
- // we can decrement the reference count immediately.
+ // if we do not need to wait for client acknowledgements
+ // we can decrement the reference count immediately.
- // By doing this _before_ the send we ensure that it
- // doesn't get sent if it can't be dequeued, preventing
- // duplicate delivery on recovery.
+ // By doing this _before_ the send we ensure that it
+ // doesn't get sent if it can't be dequeued, preventing
+ // duplicate delivery on recovery.
- // The send may of course still fail, in which case, as
- // the message is unacked, it will be lost.
- if (!_acks)
+ // The send may of course still fail, in which case, as
+ // the message is unacked, it will be lost.
+ if (!_acks)
+ {
+ if (_logger.isDebugEnabled())
{
- if (_logger.isDebugEnabled())
- {
- _logger.debug("No ack mode so dequeuing message immediately: " + msg.getMessageId());
- }
- queue.dequeue(storeContext, msg);
+ _logger.debug("No ack mode so dequeuing message immediately: " + msg.getMessageId());
}
- synchronized (channel)
- {
- long deliveryTag = channel.getNextDeliveryTag();
-
- if (_acks)
- {
- channel.addUnacknowledgedMessage(msg, deliveryTag, consumerTag, queue);
- msg.decrementReference(storeContext);
- }
+ queue.dequeue(storeContext, msg);
+ }
+ synchronized (channel)
+ {
+ long deliveryTag = channel.getNextDeliveryTag();
- protocolSession.getProtocolOutputConverter().writeDeliver(msg, channel.getChannelId(), deliveryTag, consumerTag);
+ if (_sendLock.get())
+ {
+ _logger.error("Sending " + msg + " when subscriber(" + this + ") is closed!");
+ }
+ if (_acks)
+ {
+ channel.addUnacknowledgedMessage(msg, deliveryTag, consumerTag, queue);
+ msg.decrementReference(storeContext);
}
- }
- finally
- {
+
+ protocolSession.getProtocolOutputConverter().writeDeliver(msg, channel.getChannelId(), deliveryTag, consumerTag);
+ //Only set delivered if it actually was writen successfully..
+ // using a try->finally would set it even if an error occured.
msg.setDeliveredToConsumer();
}
}
public boolean isSuspended()
{
- if (_logger.isTraceEnabled())
+ if (_suspensionlogger.isInfoEnabled())
{
if (channel.isSuspended())
{
- _logger.trace("Subscription(" + System.identityHashCode(this) + ") channel's is susupended");
+ _suspensionlogger.info("Subscription(" + debugIdentity() + ") channel's is susupended");
}
if (_sendLock.get())
{
- _logger.trace("Subscription(" + System.identityHashCode(this) + ") has sendLock set so closing.");
+ _suspensionlogger.info("Subscription(" + debugIdentity() + ") has sendLock set so closing.");
}
}
return channel.isSuspended() || _sendLock.get();
@@ -323,7 +331,7 @@ public class SubscriptionImpl implements Subscription
/**
* Callback indicating that a queue has been deleted.
*
- * @param queue
+ * @param queue The queue to delete
*/
public void queueDeleted(AMQQueue queue) throws AMQException
{
@@ -337,9 +345,18 @@ public class SubscriptionImpl implements Subscription
public boolean hasInterest(AMQMessage msg)
{
+ //check that the message hasn't been rejected
+ if (msg.isRejectedBy(this))
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Subscription:" + debugIdentity() + " rejected message:" + msg.debugIdentity());
+ }
+// return false;
+ }
+
if (_noLocal)
{
- boolean isLocal;
// We don't want local messages so check to see if message is one we sent
Object localInstance;
Object msgInstance;
@@ -350,12 +367,12 @@ public class SubscriptionImpl implements Subscription
if ((msg.getPublisher().getClientProperties() != null) &&
(msgInstance = msg.getPublisher().getClientProperties().getObject(CLIENT_PROPERTIES_INSTANCE)) != null)
{
- if (localInstance == msgInstance || ((localInstance != null) && localInstance.equals(msgInstance)))
+ if (localInstance == msgInstance || localInstance.equals(msgInstance))
{
if (_logger.isTraceEnabled())
{
- _logger.trace("(" + System.identityHashCode(this) + ") has no interest as it is a local message(" +
- System.identityHashCode(msg) + ")");
+ _logger.trace("(" + debugIdentity() + ") has no interest as it is a local message(" +
+ msg.debugIdentity() + ")");
}
return false;
}
@@ -369,8 +386,8 @@ public class SubscriptionImpl implements Subscription
{
if (_logger.isTraceEnabled())
{
- _logger.trace("(" + System.identityHashCode(this) + ") has no interest as it is a local message(" +
- System.identityHashCode(msg) + ")");
+ _logger.trace("(" + debugIdentity() + ") has no interest as it is a local message(" +
+ msg.debugIdentity() + ")");
}
return false;
}
@@ -383,19 +400,26 @@ public class SubscriptionImpl implements Subscription
if (_logger.isTraceEnabled())
{
- _logger.trace("(" + System.identityHashCode(this) + ") checking filters for message (" + System.identityHashCode(msg));
+ _logger.trace("(" + debugIdentity() + ") checking filters for message (" + msg.debugIdentity());
}
return checkFilters(msg);
}
+ private String id = String.valueOf(System.identityHashCode(this));
+
+ private String debugIdentity()
+ {
+ return id;
+ }
+
private boolean checkFilters(AMQMessage msg)
{
if (_filters != null)
{
if (_logger.isTraceEnabled())
{
- _logger.trace("(" + System.identityHashCode(this) + ") has filters.");
+ _logger.trace("(" + debugIdentity() + ") has filters.");
}
return _filters.allAllow(msg);
}
@@ -403,7 +427,7 @@ public class SubscriptionImpl implements Subscription
{
if (_logger.isTraceEnabled())
{
- _logger.trace("(" + System.identityHashCode(this) + ") has no filters");
+ _logger.trace("(" + debugIdentity() + ") has no filters");
}
return true;
@@ -445,15 +469,19 @@ public class SubscriptionImpl implements Subscription
}
_sendLock.set(true);
-
}
+
if (_logger.isInfoEnabled())
{
- _logger.info("Closing subscription (" + System.identityHashCode(this) + "):" + this);
+ _logger.info("Closing subscription (" + debugIdentity() + "):" + this);
}
if (_resendQueue != null && !_resendQueue.isEmpty())
{
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info("Requeuing closing subscription (" + debugIdentity() + "):" + this);
+ }
requeue();
}
@@ -486,6 +514,11 @@ public class SubscriptionImpl implements Subscription
{
AMQMessage resent = _resendQueue.poll();
+ if (_logger.isTraceEnabled())
+ {
+ _logger.trace("Removed for resending:" + resent.debugIdentity());
+ }
+
resent.release();
_queue.subscriberHasPendingResend(false, this, resent);
@@ -495,7 +528,7 @@ public class SubscriptionImpl implements Subscription
}
catch (AMQException e)
{
- _logger.error("Unable to re-deliver messages", e);
+ _logger.error("MESSAGE LOSS : Unable to re-deliver messages", e);
}
}