diff options
Diffstat (limited to 'qpid/java/broker/src')
13 files changed, 538 insertions, 250 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index be2cee79ee..5dd6619cff 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -99,7 +99,7 @@ public class AMQChannel private final MessageRouter _exchanges; - private TransactionalContext _txnContext; + private TransactionalContext _txnContext, _nonTransactedContext; /** * A context used by the message store enabling it to track context for a given channel even across thread @@ -113,9 +113,9 @@ public class AMQChannel private Set<Long> _browsedAcks = new HashSet<Long>(); + //Why do we need this reference ? - ritchiem private final AMQProtocolSession _session; - public AMQChannel(AMQProtocolSession session, int channelId, MessageStore messageStore, MessageRouter exchanges) throws AMQException { @@ -210,9 +210,9 @@ public class AMQChannel } else { - if (_log.isDebugEnabled()) + if (_log.isTraceEnabled()) { - _log.debug("Content header received on channel " + _channelId); + _log.trace(debugIdentity() + "Content header received on channel " + _channelId); } _currentMessage.setContentHeaderBody(contentHeaderBody); routeCurrentMessage(); @@ -234,9 +234,9 @@ public class AMQChannel throw new AMQException("Received content body without previously receiving a JmsPublishBody"); } - if (_log.isDebugEnabled()) + if (_log.isTraceEnabled()) { - _log.debug("Content body received on channel " + _channelId); + _log.trace(debugIdentity() + "Content body received on channel " + _channelId); } try { @@ -289,8 +289,10 @@ public class AMQChannel * @param tag the tag chosen by the client (if null, server will generate one) * @param queue the queue to subscribe to * @param session the protocol session of the subscriber - * @param noLocal - * @param exclusive + * @param noLocal Flag stopping own messages being receivied. + * @param exclusive Flag requesting exclusive access to the queue + * @param acks Are acks enabled for this subscriber + * @param filters Filters to apply to this subscriber * * @return the consumer tag. This is returned to the subscriber and used in subsequent unsubscribe requests * @@ -327,6 +329,8 @@ public class AMQChannel /** * Called from the protocol session to close this channel and clean up. T * + * @param session The session to close + * * @throws AMQException if there is an error during closure */ public void close(AMQProtocolSession session) throws AMQException @@ -352,10 +356,23 @@ public class AMQChannel * @param message the message that was delivered * @param deliveryTag the delivery tag used when delivering the message (see protocol spec for description of the * delivery tag) + * @param consumerTag The tag for the consumer that is to acknowledge this message. * @param queue the queue from which the message was delivered */ public void addUnacknowledgedMessage(AMQMessage message, long deliveryTag, AMQShortString consumerTag, AMQQueue queue) { + if (_log.isDebugEnabled()) + { + if (queue == null) + { + _log.debug("Adding unacked message with a null queue:" + message.debugIdentity()); + } + else + { + _log.debug(debugIdentity() + " Adding unacked message(" + deliveryTag + ") with a queue(" + queue + "):" + message.debugIdentity()); + } + } + synchronized (_unacknowledgedMessageMap.getLock()) { _unacknowledgedMessageMap.add(deliveryTag, new UnacknowledgedMessage(queue, message, consumerTag, deliveryTag)); @@ -363,8 +380,15 @@ public class AMQChannel } } + private final String id = "(" + System.identityHashCode(this) + ")"; + + public String debugIdentity() + { + return _channelId + id; + } + /** - * Called to attempt re-enqueue all outstanding unacknowledged messages on the channel. May result in delivery to + * Called to attempt re-delivery all outstanding unacknowledged messages on the channel. May result in delivery to * this same channel or to other subscribers. * * @throws org.apache.qpid.AMQException if the requeue fails @@ -374,11 +398,22 @@ public class AMQChannel // we must create a new map since all the messages will get a new delivery tag when they are redelivered Collection<UnacknowledgedMessage> messagesToBeDelivered = _unacknowledgedMessageMap.cancelAllMessages(); - TransactionalContext nontransacted = null; + // Deliver these messages out of the transaction as their delivery was never + // part of the transaction only the receive. + TransactionalContext deliveryContext; if (!(_txnContext instanceof NonTransactionalContext)) { - nontransacted = new NonTransactionalContext(_messageStore, _storeContext, this, - _returnMessages, _browsedAcks); + if (_nonTransactedContext == null) + { + _nonTransactedContext = new NonTransactionalContext(_messageStore, _storeContext, this, + _returnMessages, _browsedAcks); + } + + deliveryContext = _nonTransactedContext; + } + else + { + deliveryContext = _txnContext; } @@ -386,72 +421,130 @@ public class AMQChannel { if (unacked.queue != null) { - // Deliver these messages out of the transaction as their delivery was never - // part of the transaction only the receive. - if (!(_txnContext instanceof NonTransactionalContext)) - { - nontransacted.deliver(unacked.message, unacked.queue, false); - } - else - { - _txnContext.deliver(unacked.message, unacked.queue, false); - } + unacked.message.setRedelivered(true); + + // Deliver Message + deliveryContext.deliver(unacked.message, unacked.queue, false); + + // Should we allow access To the DM to directy deliver the message? + // As we don't need to check for Consumers or worry about incrementing the message count? +// unacked.queue.getDeliveryManager().deliver(_storeContext, unacked.queue.getName(), unacked.message, false); } } } + /** + * Requeue a single message + * + * @param deliveryTag The message to requeue + * + * @throws AMQException If something goes wrong. + */ public void requeue(long deliveryTag) throws AMQException { UnacknowledgedMessage unacked = _unacknowledgedMessageMap.remove(deliveryTag); if (unacked != null) { - TransactionalContext nontransacted = null; + + // Ensure message is released for redelivery + unacked.message.release(); + + // Mark message redelivered + unacked.message.setRedelivered(true); + + // Deliver these messages out of the transaction as their delivery was never + // part of the transaction only the receive. + TransactionalContext deliveryContext; if (!(_txnContext instanceof NonTransactionalContext)) { - nontransacted = new NonTransactionalContext(_messageStore, _storeContext, this, - _returnMessages, _browsedAcks); + if (_nonTransactedContext == null) + { + _nonTransactedContext = new NonTransactionalContext(_messageStore, _storeContext, this, + _returnMessages, _browsedAcks); + } + + deliveryContext = _nonTransactedContext; + } + else + { + deliveryContext = _txnContext; } - if (!(_txnContext instanceof NonTransactionalContext)) + + if (unacked.queue != null) { - nontransacted.deliver(unacked.message, unacked.queue, false); + //Redeliver the messages to the front of the queue + deliveryContext.deliver(unacked.message, unacked.queue, true); + + unacked.message.decrementReference(_storeContext); } else { - _txnContext.deliver(unacked.message, unacked.queue, false); + _log.warn(System.identityHashCode(this) + " Requested requeue of message(" + unacked.message.debugIdentity() + "):" + deliveryTag + + " but no queue defined and no DeadLetter queue so DROPPING message."); +// _log.error("Requested requeue of message:" + deliveryTag + +// " but no queue defined using DeadLetter queue:" + getDeadLetterQueue()); +// +// deliveryContext.deliver(unacked.message, getDeadLetterQueue(), false); +// +// unacked.message.decrementReference(_storeContext); } - unacked.message.decrementReference(_storeContext); } else { - _log.error("Requested requeue of message:" + deliveryTag + " but no such delivery tag exists"); + _log.warn("Requested requeue of message:" + deliveryTag + " but no such delivery tag exists." + _unacknowledgedMessageMap.size()); + + if (_log.isDebugEnabled()) + { + _unacknowledgedMessageMap.visit(new UnacknowledgedMessageMap.Visitor() + { + int count = 0; + + public boolean callback(UnacknowledgedMessage message) throws AMQException + { + _log.debug((count++) + ": (" + message.message.debugIdentity() + ")" + + "[" + message.deliveryTag + "]"); + return false; // Continue + } + + public void visitComplete() + { + + } + }); + } } } - /** Called to resend all outstanding unacknowledged messages to this same channel. - * @param session the session - * @param requeue if true then requeue, else resend - * @throws org.apache.qpid.AMQException */ - public void resend(final AMQProtocolSession session, final boolean requeue) throws AMQException + /** + * Called to resend all outstanding unacknowledged messages to this same channel. + * + * @param requeue Are the messages to be requeued or dropped. + * + * @throws AMQException When something goes wrong. + */ + public void resend(final boolean requeue) throws AMQException { final List<UnacknowledgedMessage> msgToRequeue = new LinkedList<UnacknowledgedMessage>(); final List<UnacknowledgedMessage> msgToResend = new LinkedList<UnacknowledgedMessage>(); if (_log.isInfoEnabled()) { - _log.info("unacked map contains " + _unacknowledgedMessageMap.size()); + _log.info("unacked map Size:" + _unacknowledgedMessageMap.size()); } + // Process the Unacked-Map. + // Marking messages who still have a consumer for to be resent + // and those that don't to be requeued. _unacknowledgedMessageMap.visit(new UnacknowledgedMessageMap.Visitor() { public boolean callback(UnacknowledgedMessage message) throws AMQException { - long deliveryTag = message.deliveryTag; AMQShortString consumerTag = message.consumerTag; AMQMessage msg = message.message; msg.setRedelivered(true); @@ -503,14 +596,21 @@ public class AMQChannel { if (!msgToResend.isEmpty()) { - _log.info("Preparing (" + msgToResend.size() + ") message to resend to."); + _log.info("Preparing (" + msgToResend.size() + ") message to resend."); + } + else + { + _log.info("No message to resend."); } } for (UnacknowledgedMessage message : msgToResend) { AMQMessage msg = message.message; - // Our Java Client will always suspend the channel when resending!! + // Our Java Client will always suspend the channel when resending! + // If the client has requested the messages be resent then it is + // their responsibility to ensure that thay are capable of receiving them + // i.e. The channel hasn't been server side suspended. // if (isSuspended()) // { // _log.info("Channel is suspended so requeuing"); @@ -518,50 +618,58 @@ public class AMQChannel // msgToRequeue.add(message); // } // else - { - //release to allow it to be delivered - msg.release(); +// { + //release to allow it to be delivered + msg.release(); - // Without any details from the client about what has been processed we have to mark - // all messages in the unacked map as redelivered. - msg.setRedelivered(true); + // Without any details from the client about what has been processed we have to mark + // all messages in the unacked map as redelivered. + msg.setRedelivered(true); - Subscription sub = msg.getDeliveredSubscription(); + Subscription sub = msg.getDeliveredSubscription(); - if (sub != null) + if (sub != null) + { + // Get the lock so we can tell if the sub scription has closed. + // will stop delivery to this subscription until the lock is released. + // note: this approach would allow the use of a single queue if the + // PreDeliveryQueue would allow head additions. + // In the Java Qpid client we are suspended whilst doing this so it is all rather Mute.. + // needs guidance from AMQP WG Model SIG + synchronized (sub.getSendLock()) { - synchronized (sub.getSendLock()) + if (sub.isClosed()) { - if (sub.isClosed()) + if (_log.isDebugEnabled()) { - _log.info("Subscription closed during resend so requeuing message"); - //move this message to requeue - msgToRequeue.add(message); + _log.debug("Subscription(" + System.identityHashCode(sub) + ") closed during resend so requeuing message"); } - else + //move this message to requeue + msgToRequeue.add(message); + } + else + { + if (_log.isDebugEnabled()) { - if (_log.isDebugEnabled()) - { - _log.debug("Requeuing (" + System.identityHashCode(msg) + ") for resend"); - } - // Will throw an exception if the sub is closed - sub.addToResendQueue(msg); - _unacknowledgedMessageMap.remove(message.deliveryTag); - // Don't decrement as we are bypassing the normal deliver which increments - // this is what there is a decrement on the Requeue as deliver will increment. - // msg.decrementReference(_storeContext); + _log.debug("Requeuing " + msg.debugIdentity() + " for resend via sub:" + System.identityHashCode(sub)); } + sub.addToResendQueue(msg); + _unacknowledgedMessageMap.remove(message.deliveryTag); + // Don't decrement as we are bypassing the normal deliver which increments + // this is why there is a decrement on the Requeue as deliver will increment. + // msg.decrementReference(_storeContext); } - } - else - { - _log.info("DeliveredSubscription not recorded so just requeueing to prevent loss"); - //move this message to requeue - msgToRequeue.add(message); - } + } // sync(sub.getSendLock) } - } + else + { + _log.info("DeliveredSubscription not recorded so just requeueing to prevent loss"); + //move this message to requeue + msgToRequeue.add(message); + } + } // for all messages +// } else !isSuspend if (_log.isInfoEnabled()) { @@ -571,26 +679,31 @@ public class AMQChannel } } - TransactionalContext nontransacted = null; + // Deliver these messages out of the transaction as their delivery was never + // part of the transaction only the receive. + TransactionalContext deliveryContext; if (!(_txnContext instanceof NonTransactionalContext)) { - nontransacted = new NonTransactionalContext(_messageStore, _storeContext, this, - _returnMessages, _browsedAcks); + if (_nonTransactedContext == null) + { + _nonTransactedContext = new NonTransactionalContext(_messageStore, _storeContext, this, + _returnMessages, _browsedAcks); + } + + deliveryContext = _nonTransactedContext; + } + else + { + deliveryContext = _txnContext; } // Process Messages to Requeue at the front of the queue for (UnacknowledgedMessage message : msgToRequeue) { - // Deliver these messages out of the transaction as their delivery was never - // part of the transaction only the receive. - if (!(_txnContext instanceof NonTransactionalContext)) - { - nontransacted.deliver(message.message, message.queue, true); - } - else - { - _txnContext.deliver(message.message, message.queue, true); - } + message.message.release(); + message.message.setRedelivered(true); + + deliveryContext.deliver(message.message, message.queue, true); _unacknowledgedMessageMap.remove(message.deliveryTag); message.message.decrementReference(_storeContext); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java index 3f2348b71b..940b5b2bf1 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java @@ -42,6 +42,21 @@ public class UnacknowledgedMessage message.incrementReference(); } + public String toString() + { + StringBuilder sb = new StringBuilder(); + sb.append("Q:"); + sb.append(queue); + sb.append(" M:"); + sb.append(message); + sb.append(" CT:"); + sb.append(consumerTag); + sb.append(" DT:"); + sb.append(deliveryTag); + + return sb.toString(); + } + public void discard(StoreContext storeContext) throws AMQException { if (queue != null) diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java index 99cc60011a..30bbdea2ef 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java @@ -196,25 +196,7 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap } } } - - public void resendMessages(AMQProtocolSession protocolSession, int channelId) throws AMQException - { - synchronized (_lock) - { - for (Map.Entry<Long, UnacknowledgedMessage> entry : _map.entrySet()) - { - long deliveryTag = entry.getKey(); - AMQShortString consumerTag = entry.getValue().consumerTag; - AMQMessage msg = entry.getValue().message; - - if(consumerTag != null) - { - protocolSession.getProtocolOutputConverter().writeDeliver(msg, channelId, deliveryTag, consumerTag); - } - } - } - } - + public UnacknowledgedMessage get(long key) { synchronized (_lock) diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicAckMethodHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicAckMethodHandler.java index f93b2b25e6..a6972475a6 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicAckMethodHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicAckMethodHandler.java @@ -47,12 +47,13 @@ public class BasicAckMethodHandler implements StateAwareMethodListener<BasicAckB public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<BasicAckBody> evt) throws AMQException { AMQProtocolSession protocolSession = stateManager.getProtocolSession(); + BasicAckBody body = evt.getMethod(); if (_log.isDebugEnabled()) { - _log.debug("Ack received on channel " + evt.getChannelId()); + _log.debug("Ack(Tag:" + body.deliveryTag + ":Mult:" + body.multiple + ") received on channel " + evt.getChannelId()); } - BasicAckBody body = evt.getMethod(); + final AMQChannel channel = protocolSession.getChannel(evt.getChannelId()); if (channel == null) diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java index bc11e4652c..a436c35473 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java @@ -54,7 +54,7 @@ public class BasicRecoverMethodHandler implements StateAwareMethodListener<Basic throw body.getChannelNotFoundException(evt.getChannelId()); } - channel.resend(session, body.requeue); + channel.resend(body.requeue); // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java index ed13092ded..4e77a5e8b9 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java @@ -23,6 +23,7 @@ package org.apache.qpid.server.handler; import org.apache.qpid.AMQException; import org.apache.qpid.framing.BasicRejectBody; import org.apache.qpid.protocol.AMQMethodEvent; +import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.ack.UnacknowledgedMessage; import org.apache.qpid.server.protocol.AMQProtocolSession; @@ -49,20 +50,67 @@ public class BasicRejectMethodHandler implements StateAwareMethodListener<BasicR { AMQProtocolSession session = stateManager.getProtocolSession(); - _logger.info("FIXME: Rejecting:" + evt.getMethod().deliveryTag + ": Requeue:" + evt.getMethod().requeue); - int channelId = evt.getChannelId(); - UnacknowledgedMessage message = session.getChannel(channelId).getUnacknowledgedMessageMap().get(evt.getMethod().deliveryTag); - - _logger.info("Need to reject message:" + message); -// if (evt.getMethod().requeue) -// { -// session.getChannel(channelId).requeue(evt.getMethod().deliveryTag); -// } -// else -// { -// // session.getChannel(channelId).resend(message); -// } + if (_logger.isTraceEnabled()) + { + _logger.trace("Rejecting:" + evt.getMethod().deliveryTag + + ": Requeue:" + evt.getMethod().requeue + +// ": Resend:" + evt.getMethod().resend + + " on channel:" + channelId); + } + + AMQChannel channel = session.getChannel(channelId); + + if (channel == null) + { + throw evt.getMethod().getChannelNotFoundException(channelId); + } + + if (_logger.isTraceEnabled()) + { + _logger.trace("Rejecting:" + evt.getMethod().deliveryTag + + ": Requeue:" + evt.getMethod().requeue + +// ": Resend:" + evt.getMethod().resend + + " on channel:" + channel.debugIdentity()); + } + + long deliveryTag = evt.getMethod().deliveryTag; + + UnacknowledgedMessage message = channel.getUnacknowledgedMessageMap().get(deliveryTag); + + if (message == null) + { + _logger.warn("Dropping reject request as message is null for tag:" + deliveryTag); +// throw evt.getMethod().getChannelException(AMQConstant.NOT_FOUND, "Delivery Tag(" + deliveryTag + ")not known"); + } + else + { + + if (_logger.isTraceEnabled()) + { + _logger.trace("Rejecting: DT:" + deliveryTag + "-" + message.message.debugIdentity() + + ": Requeue:" + evt.getMethod().requeue + +// ": Resend:" + evt.getMethod().resend + + " on channel:" + channel.debugIdentity()); + } + + // If we haven't requested message to be resent to this consumer then reject it from ever getting it. +// if (!evt.getMethod().resend) + { + message.message.reject(message.message.getDeliveredSubscription()); + } + + if (evt.getMethod().requeue) + { + channel.requeue(deliveryTag); + } + else + { + _logger.warn("Dropping message as requeue not required and there is no dead letter queue"); +// message.queue = channel.getDefaultDeadLetterQueue(); +// channel.requeue(deliveryTag); + } + } } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java index 9a8fce7129..777784ca30 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java @@ -63,6 +63,8 @@ public class ChannelCloseHandler implements StateAwareMethodListener<ChannelClos } session.closeChannel(channelId); + // Client requested closure so we don't wait for ok we send it + stateManager.getProtocolSession().closeChannelOk(channelId); // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java index a10f44f906..f747f7a840 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java @@ -63,7 +63,7 @@ public class TxRollbackHandler implements StateAwareMethodListener<TxRollbackBod //Now resend all the unacknowledged messages back to the original subscribers. //(Must be done after the TxnRollback-ok response). // Why, are we not allowed to send messages back to client before the ok method? - channel.resend(session, false); + channel.resend(false); } catch (AMQException e) { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java index d71f6e3046..133f4809b4 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java @@ -523,8 +523,8 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, { try { - markChannelawaitingCloseOk(channelId); channel.close(this); + markChannelawaitingCloseOk(channelId); } finally { @@ -546,7 +546,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, /** * In our current implementation this is used by the clustering code. * - * @param channelId + * @param channelId The channel to remove */ public void removeChannel(int channelId) { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java index dedea68d18..6d375c89fe 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java @@ -85,11 +85,18 @@ public class AMQMessage private Subscription _takenBySubcription; + private Set<Subscription> _rejectedBy = null; + public boolean isTaken() { return _taken.get(); } + public String debugIdentity() + { + return "(HC:" + System.identityHashCode(this) + " ID:" + _messageId + ")"; + } + /** * Used to iterate through all the body frames associated with this message. Will not keep all the data in memory * therefore is memory-efficient. @@ -199,7 +206,7 @@ public class AMQMessage _taken = new AtomicBoolean(false); if (_log.isDebugEnabled()) { - _log.debug("Message created with id " + messageId); + _log.debug("Message(" + System.identityHashCode(this) + ") created with id " + messageId); } } @@ -452,7 +459,9 @@ public class AMQMessage public void release() { + _log.trace("Releasing Message:" + debugIdentity()); _taken.set(false); + _takenBySubcription = null; } public boolean checkToken(Object token) @@ -511,7 +520,7 @@ public class AMQMessage * @throws NoConsumersException if the message is marked for immediate delivery but has not been marked as delivered * to a consumer */ - public void checkDeliveredToConsumer() throws NoConsumersException, AMQException + public void checkDeliveredToConsumer() throws NoConsumersException { if (_immediate && !_deliveredToConsumer) @@ -580,7 +589,8 @@ public class AMQMessage for (AMQQueue q : destinationQueues) { - _txnContext.deliver(this, q, true); + //normal deliver so add this message at the end. + _txnContext.deliver(this, q, false); } } finally @@ -801,7 +811,7 @@ public class AMQMessage public String toString() { - return "Message: " + _messageId + "; ref count: " + _referenceCount + "; taken: " + + return "Message[" + debugIdentity() + "]: " + _messageId + "; ref count: " + _referenceCount + "; taken: " + _taken + " by:" + _takenBySubcription; } @@ -809,4 +819,35 @@ public class AMQMessage { return _takenBySubcription; } + + public void reject(Subscription subscription) + { + if (subscription != null) + { + if (_rejectedBy == null) + { + _rejectedBy = new HashSet<Subscription>(); + } + + _rejectedBy.add(subscription); + } + else + { + _log.warn("Requesting rejection by null subscriber:" + debugIdentity()); + } + } + + public boolean isRejectedBy(Subscription subscription) + { + boolean rejected = _rejectedBy != null; + + if (rejected) // We have subscriptions that rejected this message + { + return _rejectedBy.contains(subscription); + } + else // This messasge hasn't been rejected yet. + { + return rejected; + } + } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java index 5bbe1671a7..7c2fe73386 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java @@ -50,6 +50,7 @@ import org.apache.qpid.server.virtualhost.VirtualHost; */ public class AMQQueue implements Managable, Comparable { + public static final class ExistingExclusiveSubscription extends AMQException { @@ -446,7 +447,11 @@ public class AMQQueue implements Managable, Comparable setExclusive(true); } - debug("Registering protocol session {0} with channel {1} and consumer tag {2} with {3}", ps, channel, consumerTag, this); + if (_logger.isDebugEnabled()) + { + _logger.debug(MessageFormat.format("Registering protocol session {0} with channel {1} and " + + "consumer tag {2} with {3}", ps, channel, consumerTag, this)); + } Subscription subscription = _subscriptionFactory.createSubscription(channel, ps, consumerTag, acks, filters, noLocal, this); @@ -486,8 +491,11 @@ public class AMQQueue implements Managable, Comparable public void unregisterProtocolSession(AMQProtocolSession ps, int channel, AMQShortString consumerTag) throws AMQException { - debug("Unregistering protocol session {0} with channel {1} and consumer tag {2} from {3}", ps, channel, consumerTag, - this); + if (_logger.isDebugEnabled()) + { + _logger.debug(MessageFormat.format("Unregistering protocol session {0} with channel {1} and consumer tag {2} from {3}", ps, channel, consumerTag, + this)); + } Subscription removedSubscription; if ((removedSubscription = _subscribers.removeSubscriber(_subscriptionFactory.createSubscription(channel, @@ -506,6 +514,10 @@ public class AMQQueue implements Managable, Comparable // if we are eligible for auto deletion, unregister from the queue registry if (_autoDelete && _subscribers.isEmpty()) { + if (_logger.isInfoEnabled()) + { + _logger.warn("Auto-deleteing queue:" + this); + } autodelete(); // we need to manually fire the event to the removed subscription (which was the last one left for this // queue. This is because the delete method uses the subscription set which has just been cleared @@ -561,14 +573,18 @@ public class AMQQueue implements Managable, Comparable protected void autodelete() throws AMQException { - debug("autodeleting {0}", this); + if (_logger.isDebugEnabled()) + { + _logger.debug(MessageFormat.format("autodeleting {0}", this)); + } delete(); } - public void processGet(StoreContext storeContext, AMQMessage msg) throws AMQException + public void processGet(StoreContext storeContext, AMQMessage msg, boolean deliverFirst) throws AMQException { //fixme not sure what this is doing. should we be passing deliverFirst through here? - _deliveryMgr.deliver(storeContext, getName(), msg, false); + // This code is not used so when it is perhaps it should + _deliveryMgr.deliver(storeContext, getName(), msg, deliverFirst); try { msg.checkDeliveredToConsumer(); @@ -582,6 +598,10 @@ public class AMQQueue implements Managable, Comparable } } +// public DeliveryManager getDeliveryManager() +// { +// return _deliveryMgr; +// } public void process(StoreContext storeContext, AMQMessage msg, boolean deliverFirst) throws AMQException { @@ -673,14 +693,6 @@ public class AMQQueue implements Managable, Comparable return "Queue(" + _name + ")@" + System.identityHashCode(this); } - private void debug(String msg, Object... args) - { - if (_logger.isDebugEnabled()) - { - _logger.debug(MessageFormat.format(msg, args)); - } - } - public boolean performGet(AMQProtocolSession session, AMQChannel channel, boolean acks) throws AMQException { return _deliveryMgr.performGet(session, channel, acks); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java index e70926736d..601effcec7 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java @@ -45,7 +45,6 @@ import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.util.MessageQueue; import org.apache.qpid.util.ConcurrentLinkedMessageQueueAtomicSize; -import org.apache.qpid.util.ConcurrentLinkedQueueAtomicSize; /** Manages delivery of messages on behalf of a queue */ @@ -86,6 +85,8 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager private AtomicLong _totalMessageSize = new AtomicLong(); private AtomicInteger _extraMessages = new AtomicInteger(); private Set<Subscription> _hasContent = Collections.synchronizedSet(new HashSet<Subscription>()); + private final Object _queueHeadLock = new Object(); + private String _processingThreadName = ""; ConcurrentSelectorDeliveryManager(SubscriptionManager subscriptions, AMQQueue queue) { @@ -118,7 +119,10 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager if (deliverFirst) { - _messages.pushHead(msg); + synchronized (_queueHeadLock) + { + _messages.pushHead(msg); + } } else { @@ -367,16 +371,19 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager long count = 0; _lock.lock(); - AMQMessage msg = getNextMessage(); - while (msg != null) + synchronized (_queueHeadLock) { - //mark this message as taken and get it removed - msg.taken(null); - _queue.dequeue(storeContext, msg); - msg = getNextMessage(); - count++; - } + AMQMessage msg = getNextMessage(); + while (msg != null) + { + //and remove it + _messages.poll(); + _queue.dequeue(storeContext, msg); + msg = getNextMessage(); + count++; + } + } _lock.unlock(); return count; } @@ -390,12 +397,20 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager { AMQMessage message = messages.peek(); - - while (message != null && ((sub == null || sub.isBrowser()) || message.taken(sub))) + while (message != null && ((sub != null && sub.isBrowser()) || message.taken(sub))) { //remove the already taken message - messages.poll(); + AMQMessage removed = messages.poll(); + + assert removed == message; + _totalMessageSize.addAndGet(-message.getSize()); + + if (_log.isTraceEnabled()) + { + _log.trace("Removed taken message:" + message.debugIdentity()); + } + // try the next message message = messages.peek(); } @@ -409,7 +424,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager if (_log.isTraceEnabled()) { - _log.trace("Async sendNextMessage for sub (" + System.identityHashCode(sub) + + _log.trace(debugIdentity() + "Async sendNextMessage for sub (" + System.identityHashCode(sub) + ") from queue (" + System.identityHashCode(messageQueue) + ") AMQQueue (" + System.identityHashCode(queue) + ")"); } @@ -417,46 +432,63 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager if (messageQueue == null) { // There is no queue with messages currently. This is ok... just means the queue has no msgs matching selector - if (_log.isDebugEnabled()) + if (_log.isInfoEnabled()) { - _log.debug(sub + ": asked to send messages but has none on given queue:" + queue); + _log.info(debugIdentity() + sub + ": asked to send messages but has none on given queue:" + queue); } return; } AMQMessage message = null; + AMQMessage removed = null; try { - message = getNextMessage(messageQueue, sub); - - // message will be null if we have no messages in the messageQueue. - if (message == null) + synchronized (_queueHeadLock) { - if (_log.isTraceEnabled()) + message = getNextMessage(messageQueue, sub); + + // message will be null if we have no messages in the messageQueue. + if (message == null) { - _log.trace("No messages for Subscriber(" + System.identityHashCode(sub) + ") from queue; (" + System.identityHashCode(messageQueue) + ")"); + if (_log.isTraceEnabled()) + { + _log.trace(debugIdentity() + "No messages for Subscriber(" + System.identityHashCode(sub) + ") from queue; (" + System.identityHashCode(messageQueue) + ")"); + } + return; + } + if (_log.isDebugEnabled()) + { + _log.debug(debugIdentity() + "Async Delivery Message " + message.getMessageId() + "(" + System.identityHashCode(message) + + ") by :" + System.identityHashCode(this) + + ") to :" + System.identityHashCode(sub)); } - return; + + sub.send(message, _queue); + + //remove sent message from our queue. + removed = messageQueue.poll(); + //If we don't remove the message from _messages + // Otherwise the Async send will never end } + + if (removed != message) + { + _log.error("Just send message:" + message.debugIdentity() + " BUT removed this from queue:" + removed); + } + if (_log.isDebugEnabled()) { - _log.debug("Async Delivery Message (" + System.identityHashCode(message) + + _log.debug(debugIdentity() + "Async Delivered Message r:" + removed.debugIdentity() + "d:" + message.debugIdentity() + ") by :" + System.identityHashCode(this) + ") to :" + System.identityHashCode(sub)); } - sub.send(message, _queue); - - //remove sent message from our queue. - messageQueue.poll(); - //If we don't remove the message from _messages - // Otherwise the Async send will never end if (messageQueue == sub.getResendQueue()) { if (_log.isTraceEnabled()) { - _log.trace("All messages sent from resendQueue for " + sub); + _log.trace(debugIdentity() + "All messages sent from resendQueue for " + sub); } if (messageQueue.isEmpty()) { @@ -469,7 +501,10 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager } else if (messageQueue == sub.getPreDeliveryQueue()) { - _log.info("We could do clean up of the main _message queue here"); + if (_log.isInfoEnabled()) + { + _log.info(debugIdentity() + "We could do clean up of the main _message queue here"); + } } _totalMessageSize.addAndGet(-message.getSize()); @@ -477,7 +512,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager catch (AMQException e) { message.release(); - _log.error("Unable to deliver message as dequeue failed: " + e, e); + _log.error(debugIdentity() + "Unable to deliver message as dequeue failed: " + e, e); } } @@ -516,6 +551,12 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager */ private void processQueue() { + //record thread name + if (_log.isDebugEnabled()) + { + _processingThreadName = Thread.currentThread().getName(); + } + // Continue to process delivery while we haveSubscribers and messages boolean hasSubscribers = _subscriptions.hasActiveSubscribers(); @@ -561,9 +602,10 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager { if (_log.isDebugEnabled()) { - _log.debug(id() + "deliver :first(" + deliverFirst + ") :" + msg); + _log.debug(debugIdentity() + "deliver :first(" + deliverFirst + ") :" + msg); } - msg.release(); + // This shouldn't be done here. +// msg.release(); //Check if we have someone to deliver the message to. _lock.lock(); @@ -575,7 +617,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager { if (_log.isDebugEnabled()) { - _log.debug(id() + "Testing Message(" + msg + ") for Queued Delivery:" + currentStatus()); + _log.debug(debugIdentity() + "Testing Message(" + msg + ") for Queued Delivery:" + currentStatus()); } if (!msg.getMessagePublishInfo().isImmediate()) { @@ -587,7 +629,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager //Pre Deliver to all subscriptions if (_log.isDebugEnabled()) { - _log.debug(id() + "We have " + _subscriptions.getSubscriptions().size() + + _log.debug(debugIdentity() + "We have " + _subscriptions.getSubscriptions().size() + " subscribers to give the message to:" + currentStatus()); } for (Subscription sub : _subscriptions.getSubscriptions()) @@ -598,7 +640,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager { if (_log.isDebugEnabled()) { - _log.debug(id() + "Stopping PreDelivery as message(" + System.identityHashCode(msg) + + _log.debug(debugIdentity() + "Stopping PreDelivery as message(" + System.identityHashCode(msg) + ") is already delivered."); } continue; @@ -609,7 +651,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager { if (_log.isDebugEnabled()) { - _log.debug(id() + "Queuing message(" + System.identityHashCode(msg) + + _log.debug(debugIdentity() + "Queuing message(" + System.identityHashCode(msg) + ") for PreDelivery for subscriber(" + System.identityHashCode(sub) + ")"); } sub.enqueueForPreDelivery(msg, deliverFirst); @@ -625,9 +667,9 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager { if (!s.isSuspended()) { - if (_log.isDebugEnabled()) + if (_log.isTraceEnabled()) { - _log.debug(id() + "Delivering Message:" + System.identityHashCode(msg) + " to(" + + _log.trace(debugIdentity() + "Delivering Message:" + msg.debugIdentity() + " to(" + System.identityHashCode(s) + ") :" + s); } msg.taken(s); @@ -636,33 +678,35 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager } else { - if (_log.isDebugEnabled()) + if (_log.isInfoEnabled()) { - _log.debug(id() + " Subscription(" + System.identityHashCode(s) + ") became suspended between nextSubscriber and send"); + _log.info(debugIdentity() + " Subscription(" + System.identityHashCode(s) + ") became " + + "suspended between nextSubscriber and send for message:" + msg.debugIdentity()); } } + } - if (!msg.isTaken()) + if (!msg.isTaken()) + { + if (_log.isInfoEnabled()) { - if (_log.isDebugEnabled()) - { - _log.debug(id() + " Message(" + System.identityHashCode(msg) + ") has not been taken so recursing!:" + - " Subscriber:" + System.identityHashCode(s)); - } - - deliver(context, name, msg, deliverFirst); + _log.info(debugIdentity() + " Message(" + msg.debugIdentity() + ") has not been taken so recursing!:" + + " Subscriber:" + System.identityHashCode(s)); } - else + + deliver(context, name, msg, deliverFirst); + } + else + { + if (_log.isDebugEnabled()) { - if (_log.isDebugEnabled()) - { - _log.debug(id() + " Message(" + System.identityHashCode(msg) + - ") has been taken so disregarding deliver request to Subscriber:" + - System.identityHashCode(s)); - } + _log.debug(debugIdentity() + " Message(" + msg.debugIdentity() + + ") has been taken so disregarding deliver request to Subscriber:" + + System.identityHashCode(s)); } } } + } finally { @@ -674,10 +718,9 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager } } - //fixme remove private final String id = "(" + String.valueOf(System.identityHashCode(this)) + ")"; - private String id() + private String debugIdentity() { return id; } @@ -710,7 +753,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager { if (_log.isDebugEnabled()) { - _log.debug("Processing Async." + currentStatus()); + _log.debug(debugIdentity() + "Processing Async." + currentStatus()); } if (hasQueuedMessages() && _subscriptions.hasActiveSubscribers()) @@ -725,14 +768,12 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager private String currentStatus() { - return " Queued:" + (_messages.isEmpty() ? "Empty " : "Contains") + - "(" + _messages.size() + ":" + ((ConcurrentLinkedQueue) _messages).size() + ") " + + return " Queued:" + (_messages.isEmpty() ? "Empty " : "Contains(M:H)") + + "(" + _messages.size() + ":" + ((ConcurrentLinkedMessageQueueAtomicSize) _messages).headSize() + ") " + " Extra: " + (_hasContent.isEmpty() ? "Empty " : "Contains") + "(" + _hasContent.size() + ":" + _extraMessages.get() + ") " + " Active:" + _subscriptions.hasActiveSubscribers() + - " Processing:" + _processing.get() + - " Queued:" + (_messages.isEmpty() ? "Empty " : "Contains") + - "(" + _messages.size() + ":" + ((ConcurrentLinkedQueue) _messages).size() + ") "; + " Processing:" + (_processing.get() ? " true : Processing Thread: " + _processingThreadName : " false"); } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java index 0a2e73880c..20033daac7 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java @@ -46,6 +46,8 @@ import org.apache.qpid.util.ConcurrentLinkedMessageQueueAtomicSize; */ public class SubscriptionImpl implements Subscription { + + private static final Logger _suspensionlogger = Logger.getLogger("Suspension"); private static final Logger _logger = Logger.getLogger(SubscriptionImpl.class); public final AMQChannel channel; @@ -258,6 +260,12 @@ public class SubscriptionImpl implements Subscription { channel.addUnacknowledgedBrowsedMessage(msg, deliveryTag, consumerTag, queue); } + + if (_sendLock.get()) + { + _logger.error("Sending " + msg + " when subscriber(" + this + ") is closed!"); + } + protocolSession.getProtocolOutputConverter().writeDeliver(msg, channel.getChannelId(), deliveryTag, consumerTag); } } @@ -265,56 +273,56 @@ public class SubscriptionImpl implements Subscription private void sendToConsumer(StoreContext storeContext, AMQMessage msg, AMQQueue queue) throws AMQException { - try - { - // if we do not need to wait for client acknowledgements - // we can decrement the reference count immediately. + // if we do not need to wait for client acknowledgements + // we can decrement the reference count immediately. - // By doing this _before_ the send we ensure that it - // doesn't get sent if it can't be dequeued, preventing - // duplicate delivery on recovery. + // By doing this _before_ the send we ensure that it + // doesn't get sent if it can't be dequeued, preventing + // duplicate delivery on recovery. - // The send may of course still fail, in which case, as - // the message is unacked, it will be lost. - if (!_acks) + // The send may of course still fail, in which case, as + // the message is unacked, it will be lost. + if (!_acks) + { + if (_logger.isDebugEnabled()) { - if (_logger.isDebugEnabled()) - { - _logger.debug("No ack mode so dequeuing message immediately: " + msg.getMessageId()); - } - queue.dequeue(storeContext, msg); + _logger.debug("No ack mode so dequeuing message immediately: " + msg.getMessageId()); } - synchronized (channel) - { - long deliveryTag = channel.getNextDeliveryTag(); - - if (_acks) - { - channel.addUnacknowledgedMessage(msg, deliveryTag, consumerTag, queue); - msg.decrementReference(storeContext); - } + queue.dequeue(storeContext, msg); + } + synchronized (channel) + { + long deliveryTag = channel.getNextDeliveryTag(); - protocolSession.getProtocolOutputConverter().writeDeliver(msg, channel.getChannelId(), deliveryTag, consumerTag); + if (_sendLock.get()) + { + _logger.error("Sending " + msg + " when subscriber(" + this + ") is closed!"); + } + if (_acks) + { + channel.addUnacknowledgedMessage(msg, deliveryTag, consumerTag, queue); + msg.decrementReference(storeContext); } - } - finally - { + + protocolSession.getProtocolOutputConverter().writeDeliver(msg, channel.getChannelId(), deliveryTag, consumerTag); + //Only set delivered if it actually was writen successfully.. + // using a try->finally would set it even if an error occured. msg.setDeliveredToConsumer(); } } public boolean isSuspended() { - if (_logger.isTraceEnabled()) + if (_suspensionlogger.isInfoEnabled()) { if (channel.isSuspended()) { - _logger.trace("Subscription(" + System.identityHashCode(this) + ") channel's is susupended"); + _suspensionlogger.info("Subscription(" + debugIdentity() + ") channel's is susupended"); } if (_sendLock.get()) { - _logger.trace("Subscription(" + System.identityHashCode(this) + ") has sendLock set so closing."); + _suspensionlogger.info("Subscription(" + debugIdentity() + ") has sendLock set so closing."); } } return channel.isSuspended() || _sendLock.get(); @@ -323,7 +331,7 @@ public class SubscriptionImpl implements Subscription /** * Callback indicating that a queue has been deleted. * - * @param queue + * @param queue The queue to delete */ public void queueDeleted(AMQQueue queue) throws AMQException { @@ -337,9 +345,18 @@ public class SubscriptionImpl implements Subscription public boolean hasInterest(AMQMessage msg) { + //check that the message hasn't been rejected + if (msg.isRejectedBy(this)) + { + if (_logger.isDebugEnabled()) + { + _logger.debug("Subscription:" + debugIdentity() + " rejected message:" + msg.debugIdentity()); + } +// return false; + } + if (_noLocal) { - boolean isLocal; // We don't want local messages so check to see if message is one we sent Object localInstance; Object msgInstance; @@ -350,12 +367,12 @@ public class SubscriptionImpl implements Subscription if ((msg.getPublisher().getClientProperties() != null) && (msgInstance = msg.getPublisher().getClientProperties().getObject(CLIENT_PROPERTIES_INSTANCE)) != null) { - if (localInstance == msgInstance || ((localInstance != null) && localInstance.equals(msgInstance))) + if (localInstance == msgInstance || localInstance.equals(msgInstance)) { if (_logger.isTraceEnabled()) { - _logger.trace("(" + System.identityHashCode(this) + ") has no interest as it is a local message(" + - System.identityHashCode(msg) + ")"); + _logger.trace("(" + debugIdentity() + ") has no interest as it is a local message(" + + msg.debugIdentity() + ")"); } return false; } @@ -369,8 +386,8 @@ public class SubscriptionImpl implements Subscription { if (_logger.isTraceEnabled()) { - _logger.trace("(" + System.identityHashCode(this) + ") has no interest as it is a local message(" + - System.identityHashCode(msg) + ")"); + _logger.trace("(" + debugIdentity() + ") has no interest as it is a local message(" + + msg.debugIdentity() + ")"); } return false; } @@ -383,19 +400,26 @@ public class SubscriptionImpl implements Subscription if (_logger.isTraceEnabled()) { - _logger.trace("(" + System.identityHashCode(this) + ") checking filters for message (" + System.identityHashCode(msg)); + _logger.trace("(" + debugIdentity() + ") checking filters for message (" + msg.debugIdentity()); } return checkFilters(msg); } + private String id = String.valueOf(System.identityHashCode(this)); + + private String debugIdentity() + { + return id; + } + private boolean checkFilters(AMQMessage msg) { if (_filters != null) { if (_logger.isTraceEnabled()) { - _logger.trace("(" + System.identityHashCode(this) + ") has filters."); + _logger.trace("(" + debugIdentity() + ") has filters."); } return _filters.allAllow(msg); } @@ -403,7 +427,7 @@ public class SubscriptionImpl implements Subscription { if (_logger.isTraceEnabled()) { - _logger.trace("(" + System.identityHashCode(this) + ") has no filters"); + _logger.trace("(" + debugIdentity() + ") has no filters"); } return true; @@ -445,15 +469,19 @@ public class SubscriptionImpl implements Subscription } _sendLock.set(true); - } + if (_logger.isInfoEnabled()) { - _logger.info("Closing subscription (" + System.identityHashCode(this) + "):" + this); + _logger.info("Closing subscription (" + debugIdentity() + "):" + this); } if (_resendQueue != null && !_resendQueue.isEmpty()) { + if (_logger.isInfoEnabled()) + { + _logger.info("Requeuing closing subscription (" + debugIdentity() + "):" + this); + } requeue(); } @@ -486,6 +514,11 @@ public class SubscriptionImpl implements Subscription { AMQMessage resent = _resendQueue.poll(); + if (_logger.isTraceEnabled()) + { + _logger.trace("Removed for resending:" + resent.debugIdentity()); + } + resent.release(); _queue.subscriberHasPendingResend(false, this, resent); @@ -495,7 +528,7 @@ public class SubscriptionImpl implements Subscription } catch (AMQException e) { - _logger.error("Unable to re-deliver messages", e); + _logger.error("MESSAGE LOSS : Unable to re-deliver messages", e); } } |
