diff options
Diffstat (limited to 'java')
30 files changed, 1599 insertions, 454 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index be2cee79ee..5dd6619cff 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -99,7 +99,7 @@ public class AMQChannel private final MessageRouter _exchanges; - private TransactionalContext _txnContext; + private TransactionalContext _txnContext, _nonTransactedContext; /** * A context used by the message store enabling it to track context for a given channel even across thread @@ -113,9 +113,9 @@ public class AMQChannel private Set<Long> _browsedAcks = new HashSet<Long>(); + //Why do we need this reference ? - ritchiem private final AMQProtocolSession _session; - public AMQChannel(AMQProtocolSession session, int channelId, MessageStore messageStore, MessageRouter exchanges) throws AMQException { @@ -210,9 +210,9 @@ public class AMQChannel } else { - if (_log.isDebugEnabled()) + if (_log.isTraceEnabled()) { - _log.debug("Content header received on channel " + _channelId); + _log.trace(debugIdentity() + "Content header received on channel " + _channelId); } _currentMessage.setContentHeaderBody(contentHeaderBody); routeCurrentMessage(); @@ -234,9 +234,9 @@ public class AMQChannel throw new AMQException("Received content body without previously receiving a JmsPublishBody"); } - if (_log.isDebugEnabled()) + if (_log.isTraceEnabled()) { - _log.debug("Content body received on channel " + _channelId); + _log.trace(debugIdentity() + "Content body received on channel " + _channelId); } try { @@ -289,8 +289,10 @@ public class AMQChannel * @param tag the tag chosen by the client (if null, server will generate one) * @param queue the queue to subscribe to * @param session the protocol session of the subscriber - * @param noLocal - * @param exclusive + * @param noLocal Flag stopping own messages being receivied. + * @param exclusive Flag requesting exclusive access to the queue + * @param acks Are acks enabled for this subscriber + * @param filters Filters to apply to this subscriber * * @return the consumer tag. This is returned to the subscriber and used in subsequent unsubscribe requests * @@ -327,6 +329,8 @@ public class AMQChannel /** * Called from the protocol session to close this channel and clean up. T * + * @param session The session to close + * * @throws AMQException if there is an error during closure */ public void close(AMQProtocolSession session) throws AMQException @@ -352,10 +356,23 @@ public class AMQChannel * @param message the message that was delivered * @param deliveryTag the delivery tag used when delivering the message (see protocol spec for description of the * delivery tag) + * @param consumerTag The tag for the consumer that is to acknowledge this message. * @param queue the queue from which the message was delivered */ public void addUnacknowledgedMessage(AMQMessage message, long deliveryTag, AMQShortString consumerTag, AMQQueue queue) { + if (_log.isDebugEnabled()) + { + if (queue == null) + { + _log.debug("Adding unacked message with a null queue:" + message.debugIdentity()); + } + else + { + _log.debug(debugIdentity() + " Adding unacked message(" + deliveryTag + ") with a queue(" + queue + "):" + message.debugIdentity()); + } + } + synchronized (_unacknowledgedMessageMap.getLock()) { _unacknowledgedMessageMap.add(deliveryTag, new UnacknowledgedMessage(queue, message, consumerTag, deliveryTag)); @@ -363,8 +380,15 @@ public class AMQChannel } } + private final String id = "(" + System.identityHashCode(this) + ")"; + + public String debugIdentity() + { + return _channelId + id; + } + /** - * Called to attempt re-enqueue all outstanding unacknowledged messages on the channel. May result in delivery to + * Called to attempt re-delivery all outstanding unacknowledged messages on the channel. May result in delivery to * this same channel or to other subscribers. * * @throws org.apache.qpid.AMQException if the requeue fails @@ -374,11 +398,22 @@ public class AMQChannel // we must create a new map since all the messages will get a new delivery tag when they are redelivered Collection<UnacknowledgedMessage> messagesToBeDelivered = _unacknowledgedMessageMap.cancelAllMessages(); - TransactionalContext nontransacted = null; + // Deliver these messages out of the transaction as their delivery was never + // part of the transaction only the receive. + TransactionalContext deliveryContext; if (!(_txnContext instanceof NonTransactionalContext)) { - nontransacted = new NonTransactionalContext(_messageStore, _storeContext, this, - _returnMessages, _browsedAcks); + if (_nonTransactedContext == null) + { + _nonTransactedContext = new NonTransactionalContext(_messageStore, _storeContext, this, + _returnMessages, _browsedAcks); + } + + deliveryContext = _nonTransactedContext; + } + else + { + deliveryContext = _txnContext; } @@ -386,72 +421,130 @@ public class AMQChannel { if (unacked.queue != null) { - // Deliver these messages out of the transaction as their delivery was never - // part of the transaction only the receive. - if (!(_txnContext instanceof NonTransactionalContext)) - { - nontransacted.deliver(unacked.message, unacked.queue, false); - } - else - { - _txnContext.deliver(unacked.message, unacked.queue, false); - } + unacked.message.setRedelivered(true); + + // Deliver Message + deliveryContext.deliver(unacked.message, unacked.queue, false); + + // Should we allow access To the DM to directy deliver the message? + // As we don't need to check for Consumers or worry about incrementing the message count? +// unacked.queue.getDeliveryManager().deliver(_storeContext, unacked.queue.getName(), unacked.message, false); } } } + /** + * Requeue a single message + * + * @param deliveryTag The message to requeue + * + * @throws AMQException If something goes wrong. + */ public void requeue(long deliveryTag) throws AMQException { UnacknowledgedMessage unacked = _unacknowledgedMessageMap.remove(deliveryTag); if (unacked != null) { - TransactionalContext nontransacted = null; + + // Ensure message is released for redelivery + unacked.message.release(); + + // Mark message redelivered + unacked.message.setRedelivered(true); + + // Deliver these messages out of the transaction as their delivery was never + // part of the transaction only the receive. + TransactionalContext deliveryContext; if (!(_txnContext instanceof NonTransactionalContext)) { - nontransacted = new NonTransactionalContext(_messageStore, _storeContext, this, - _returnMessages, _browsedAcks); + if (_nonTransactedContext == null) + { + _nonTransactedContext = new NonTransactionalContext(_messageStore, _storeContext, this, + _returnMessages, _browsedAcks); + } + + deliveryContext = _nonTransactedContext; + } + else + { + deliveryContext = _txnContext; } - if (!(_txnContext instanceof NonTransactionalContext)) + + if (unacked.queue != null) { - nontransacted.deliver(unacked.message, unacked.queue, false); + //Redeliver the messages to the front of the queue + deliveryContext.deliver(unacked.message, unacked.queue, true); + + unacked.message.decrementReference(_storeContext); } else { - _txnContext.deliver(unacked.message, unacked.queue, false); + _log.warn(System.identityHashCode(this) + " Requested requeue of message(" + unacked.message.debugIdentity() + "):" + deliveryTag + + " but no queue defined and no DeadLetter queue so DROPPING message."); +// _log.error("Requested requeue of message:" + deliveryTag + +// " but no queue defined using DeadLetter queue:" + getDeadLetterQueue()); +// +// deliveryContext.deliver(unacked.message, getDeadLetterQueue(), false); +// +// unacked.message.decrementReference(_storeContext); } - unacked.message.decrementReference(_storeContext); } else { - _log.error("Requested requeue of message:" + deliveryTag + " but no such delivery tag exists"); + _log.warn("Requested requeue of message:" + deliveryTag + " but no such delivery tag exists." + _unacknowledgedMessageMap.size()); + + if (_log.isDebugEnabled()) + { + _unacknowledgedMessageMap.visit(new UnacknowledgedMessageMap.Visitor() + { + int count = 0; + + public boolean callback(UnacknowledgedMessage message) throws AMQException + { + _log.debug((count++) + ": (" + message.message.debugIdentity() + ")" + + "[" + message.deliveryTag + "]"); + return false; // Continue + } + + public void visitComplete() + { + + } + }); + } } } - /** Called to resend all outstanding unacknowledged messages to this same channel. - * @param session the session - * @param requeue if true then requeue, else resend - * @throws org.apache.qpid.AMQException */ - public void resend(final AMQProtocolSession session, final boolean requeue) throws AMQException + /** + * Called to resend all outstanding unacknowledged messages to this same channel. + * + * @param requeue Are the messages to be requeued or dropped. + * + * @throws AMQException When something goes wrong. + */ + public void resend(final boolean requeue) throws AMQException { final List<UnacknowledgedMessage> msgToRequeue = new LinkedList<UnacknowledgedMessage>(); final List<UnacknowledgedMessage> msgToResend = new LinkedList<UnacknowledgedMessage>(); if (_log.isInfoEnabled()) { - _log.info("unacked map contains " + _unacknowledgedMessageMap.size()); + _log.info("unacked map Size:" + _unacknowledgedMessageMap.size()); } + // Process the Unacked-Map. + // Marking messages who still have a consumer for to be resent + // and those that don't to be requeued. _unacknowledgedMessageMap.visit(new UnacknowledgedMessageMap.Visitor() { public boolean callback(UnacknowledgedMessage message) throws AMQException { - long deliveryTag = message.deliveryTag; AMQShortString consumerTag = message.consumerTag; AMQMessage msg = message.message; msg.setRedelivered(true); @@ -503,14 +596,21 @@ public class AMQChannel { if (!msgToResend.isEmpty()) { - _log.info("Preparing (" + msgToResend.size() + ") message to resend to."); + _log.info("Preparing (" + msgToResend.size() + ") message to resend."); + } + else + { + _log.info("No message to resend."); } } for (UnacknowledgedMessage message : msgToResend) { AMQMessage msg = message.message; - // Our Java Client will always suspend the channel when resending!! + // Our Java Client will always suspend the channel when resending! + // If the client has requested the messages be resent then it is + // their responsibility to ensure that thay are capable of receiving them + // i.e. The channel hasn't been server side suspended. // if (isSuspended()) // { // _log.info("Channel is suspended so requeuing"); @@ -518,50 +618,58 @@ public class AMQChannel // msgToRequeue.add(message); // } // else - { - //release to allow it to be delivered - msg.release(); +// { + //release to allow it to be delivered + msg.release(); - // Without any details from the client about what has been processed we have to mark - // all messages in the unacked map as redelivered. - msg.setRedelivered(true); + // Without any details from the client about what has been processed we have to mark + // all messages in the unacked map as redelivered. + msg.setRedelivered(true); - Subscription sub = msg.getDeliveredSubscription(); + Subscription sub = msg.getDeliveredSubscription(); - if (sub != null) + if (sub != null) + { + // Get the lock so we can tell if the sub scription has closed. + // will stop delivery to this subscription until the lock is released. + // note: this approach would allow the use of a single queue if the + // PreDeliveryQueue would allow head additions. + // In the Java Qpid client we are suspended whilst doing this so it is all rather Mute.. + // needs guidance from AMQP WG Model SIG + synchronized (sub.getSendLock()) { - synchronized (sub.getSendLock()) + if (sub.isClosed()) { - if (sub.isClosed()) + if (_log.isDebugEnabled()) { - _log.info("Subscription closed during resend so requeuing message"); - //move this message to requeue - msgToRequeue.add(message); + _log.debug("Subscription(" + System.identityHashCode(sub) + ") closed during resend so requeuing message"); } - else + //move this message to requeue + msgToRequeue.add(message); + } + else + { + if (_log.isDebugEnabled()) { - if (_log.isDebugEnabled()) - { - _log.debug("Requeuing (" + System.identityHashCode(msg) + ") for resend"); - } - // Will throw an exception if the sub is closed - sub.addToResendQueue(msg); - _unacknowledgedMessageMap.remove(message.deliveryTag); - // Don't decrement as we are bypassing the normal deliver which increments - // this is what there is a decrement on the Requeue as deliver will increment. - // msg.decrementReference(_storeContext); + _log.debug("Requeuing " + msg.debugIdentity() + " for resend via sub:" + System.identityHashCode(sub)); } + sub.addToResendQueue(msg); + _unacknowledgedMessageMap.remove(message.deliveryTag); + // Don't decrement as we are bypassing the normal deliver which increments + // this is why there is a decrement on the Requeue as deliver will increment. + // msg.decrementReference(_storeContext); } - } - else - { - _log.info("DeliveredSubscription not recorded so just requeueing to prevent loss"); - //move this message to requeue - msgToRequeue.add(message); - } + } // sync(sub.getSendLock) } - } + else + { + _log.info("DeliveredSubscription not recorded so just requeueing to prevent loss"); + //move this message to requeue + msgToRequeue.add(message); + } + } // for all messages +// } else !isSuspend if (_log.isInfoEnabled()) { @@ -571,26 +679,31 @@ public class AMQChannel } } - TransactionalContext nontransacted = null; + // Deliver these messages out of the transaction as their delivery was never + // part of the transaction only the receive. + TransactionalContext deliveryContext; if (!(_txnContext instanceof NonTransactionalContext)) { - nontransacted = new NonTransactionalContext(_messageStore, _storeContext, this, - _returnMessages, _browsedAcks); + if (_nonTransactedContext == null) + { + _nonTransactedContext = new NonTransactionalContext(_messageStore, _storeContext, this, + _returnMessages, _browsedAcks); + } + + deliveryContext = _nonTransactedContext; + } + else + { + deliveryContext = _txnContext; } // Process Messages to Requeue at the front of the queue for (UnacknowledgedMessage message : msgToRequeue) { - // Deliver these messages out of the transaction as their delivery was never - // part of the transaction only the receive. - if (!(_txnContext instanceof NonTransactionalContext)) - { - nontransacted.deliver(message.message, message.queue, true); - } - else - { - _txnContext.deliver(message.message, message.queue, true); - } + message.message.release(); + message.message.setRedelivered(true); + + deliveryContext.deliver(message.message, message.queue, true); _unacknowledgedMessageMap.remove(message.deliveryTag); message.message.decrementReference(_storeContext); diff --git a/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java b/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java index 3f2348b71b..940b5b2bf1 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java +++ b/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java @@ -42,6 +42,21 @@ public class UnacknowledgedMessage message.incrementReference(); } + public String toString() + { + StringBuilder sb = new StringBuilder(); + sb.append("Q:"); + sb.append(queue); + sb.append(" M:"); + sb.append(message); + sb.append(" CT:"); + sb.append(consumerTag); + sb.append(" DT:"); + sb.append(deliveryTag); + + return sb.toString(); + } + public void discard(StoreContext storeContext) throws AMQException { if (queue != null) diff --git a/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java b/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java index 99cc60011a..30bbdea2ef 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java +++ b/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java @@ -196,25 +196,7 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap } } } - - public void resendMessages(AMQProtocolSession protocolSession, int channelId) throws AMQException - { - synchronized (_lock) - { - for (Map.Entry<Long, UnacknowledgedMessage> entry : _map.entrySet()) - { - long deliveryTag = entry.getKey(); - AMQShortString consumerTag = entry.getValue().consumerTag; - AMQMessage msg = entry.getValue().message; - - if(consumerTag != null) - { - protocolSession.getProtocolOutputConverter().writeDeliver(msg, channelId, deliveryTag, consumerTag); - } - } - } - } - + public UnacknowledgedMessage get(long key) { synchronized (_lock) diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicAckMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicAckMethodHandler.java index f93b2b25e6..a6972475a6 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicAckMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicAckMethodHandler.java @@ -47,12 +47,13 @@ public class BasicAckMethodHandler implements StateAwareMethodListener<BasicAckB public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<BasicAckBody> evt) throws AMQException { AMQProtocolSession protocolSession = stateManager.getProtocolSession(); + BasicAckBody body = evt.getMethod(); if (_log.isDebugEnabled()) { - _log.debug("Ack received on channel " + evt.getChannelId()); + _log.debug("Ack(Tag:" + body.deliveryTag + ":Mult:" + body.multiple + ") received on channel " + evt.getChannelId()); } - BasicAckBody body = evt.getMethod(); + final AMQChannel channel = protocolSession.getChannel(evt.getChannelId()); if (channel == null) diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java index bc11e4652c..a436c35473 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java @@ -54,7 +54,7 @@ public class BasicRecoverMethodHandler implements StateAwareMethodListener<Basic throw body.getChannelNotFoundException(evt.getChannelId()); } - channel.resend(session, body.requeue); + channel.resend(body.requeue); // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java index ed13092ded..4e77a5e8b9 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java @@ -23,6 +23,7 @@ package org.apache.qpid.server.handler; import org.apache.qpid.AMQException; import org.apache.qpid.framing.BasicRejectBody; import org.apache.qpid.protocol.AMQMethodEvent; +import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.ack.UnacknowledgedMessage; import org.apache.qpid.server.protocol.AMQProtocolSession; @@ -49,20 +50,67 @@ public class BasicRejectMethodHandler implements StateAwareMethodListener<BasicR { AMQProtocolSession session = stateManager.getProtocolSession(); - _logger.info("FIXME: Rejecting:" + evt.getMethod().deliveryTag + ": Requeue:" + evt.getMethod().requeue); - int channelId = evt.getChannelId(); - UnacknowledgedMessage message = session.getChannel(channelId).getUnacknowledgedMessageMap().get(evt.getMethod().deliveryTag); - - _logger.info("Need to reject message:" + message); -// if (evt.getMethod().requeue) -// { -// session.getChannel(channelId).requeue(evt.getMethod().deliveryTag); -// } -// else -// { -// // session.getChannel(channelId).resend(message); -// } + if (_logger.isTraceEnabled()) + { + _logger.trace("Rejecting:" + evt.getMethod().deliveryTag + + ": Requeue:" + evt.getMethod().requeue + +// ": Resend:" + evt.getMethod().resend + + " on channel:" + channelId); + } + + AMQChannel channel = session.getChannel(channelId); + + if (channel == null) + { + throw evt.getMethod().getChannelNotFoundException(channelId); + } + + if (_logger.isTraceEnabled()) + { + _logger.trace("Rejecting:" + evt.getMethod().deliveryTag + + ": Requeue:" + evt.getMethod().requeue + +// ": Resend:" + evt.getMethod().resend + + " on channel:" + channel.debugIdentity()); + } + + long deliveryTag = evt.getMethod().deliveryTag; + + UnacknowledgedMessage message = channel.getUnacknowledgedMessageMap().get(deliveryTag); + + if (message == null) + { + _logger.warn("Dropping reject request as message is null for tag:" + deliveryTag); +// throw evt.getMethod().getChannelException(AMQConstant.NOT_FOUND, "Delivery Tag(" + deliveryTag + ")not known"); + } + else + { + + if (_logger.isTraceEnabled()) + { + _logger.trace("Rejecting: DT:" + deliveryTag + "-" + message.message.debugIdentity() + + ": Requeue:" + evt.getMethod().requeue + +// ": Resend:" + evt.getMethod().resend + + " on channel:" + channel.debugIdentity()); + } + + // If we haven't requested message to be resent to this consumer then reject it from ever getting it. +// if (!evt.getMethod().resend) + { + message.message.reject(message.message.getDeliveredSubscription()); + } + + if (evt.getMethod().requeue) + { + channel.requeue(deliveryTag); + } + else + { + _logger.warn("Dropping message as requeue not required and there is no dead letter queue"); +// message.queue = channel.getDefaultDeadLetterQueue(); +// channel.requeue(deliveryTag); + } + } } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java index 9a8fce7129..777784ca30 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java @@ -63,6 +63,8 @@ public class ChannelCloseHandler implements StateAwareMethodListener<ChannelClos } session.closeChannel(channelId); + // Client requested closure so we don't wait for ok we send it + stateManager.getProtocolSession().closeChannelOk(channelId); // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java index a10f44f906..f747f7a840 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java @@ -63,7 +63,7 @@ public class TxRollbackHandler implements StateAwareMethodListener<TxRollbackBod //Now resend all the unacknowledged messages back to the original subscribers. //(Must be done after the TxnRollback-ok response). // Why, are we not allowed to send messages back to client before the ok method? - channel.resend(session, false); + channel.resend(false); } catch (AMQException e) { diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java index d71f6e3046..133f4809b4 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java @@ -523,8 +523,8 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, { try { - markChannelawaitingCloseOk(channelId); channel.close(this); + markChannelawaitingCloseOk(channelId); } finally { @@ -546,7 +546,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, /** * In our current implementation this is used by the clustering code. * - * @param channelId + * @param channelId The channel to remove */ public void removeChannel(int channelId) { diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java index dedea68d18..6d375c89fe 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java @@ -85,11 +85,18 @@ public class AMQMessage private Subscription _takenBySubcription; + private Set<Subscription> _rejectedBy = null; + public boolean isTaken() { return _taken.get(); } + public String debugIdentity() + { + return "(HC:" + System.identityHashCode(this) + " ID:" + _messageId + ")"; + } + /** * Used to iterate through all the body frames associated with this message. Will not keep all the data in memory * therefore is memory-efficient. @@ -199,7 +206,7 @@ public class AMQMessage _taken = new AtomicBoolean(false); if (_log.isDebugEnabled()) { - _log.debug("Message created with id " + messageId); + _log.debug("Message(" + System.identityHashCode(this) + ") created with id " + messageId); } } @@ -452,7 +459,9 @@ public class AMQMessage public void release() { + _log.trace("Releasing Message:" + debugIdentity()); _taken.set(false); + _takenBySubcription = null; } public boolean checkToken(Object token) @@ -511,7 +520,7 @@ public class AMQMessage * @throws NoConsumersException if the message is marked for immediate delivery but has not been marked as delivered * to a consumer */ - public void checkDeliveredToConsumer() throws NoConsumersException, AMQException + public void checkDeliveredToConsumer() throws NoConsumersException { if (_immediate && !_deliveredToConsumer) @@ -580,7 +589,8 @@ public class AMQMessage for (AMQQueue q : destinationQueues) { - _txnContext.deliver(this, q, true); + //normal deliver so add this message at the end. + _txnContext.deliver(this, q, false); } } finally @@ -801,7 +811,7 @@ public class AMQMessage public String toString() { - return "Message: " + _messageId + "; ref count: " + _referenceCount + "; taken: " + + return "Message[" + debugIdentity() + "]: " + _messageId + "; ref count: " + _referenceCount + "; taken: " + _taken + " by:" + _takenBySubcription; } @@ -809,4 +819,35 @@ public class AMQMessage { return _takenBySubcription; } + + public void reject(Subscription subscription) + { + if (subscription != null) + { + if (_rejectedBy == null) + { + _rejectedBy = new HashSet<Subscription>(); + } + + _rejectedBy.add(subscription); + } + else + { + _log.warn("Requesting rejection by null subscriber:" + debugIdentity()); + } + } + + public boolean isRejectedBy(Subscription subscription) + { + boolean rejected = _rejectedBy != null; + + if (rejected) // We have subscriptions that rejected this message + { + return _rejectedBy.contains(subscription); + } + else // This messasge hasn't been rejected yet. + { + return rejected; + } + } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java index 5bbe1671a7..7c2fe73386 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java @@ -50,6 +50,7 @@ import org.apache.qpid.server.virtualhost.VirtualHost; */ public class AMQQueue implements Managable, Comparable { + public static final class ExistingExclusiveSubscription extends AMQException { @@ -446,7 +447,11 @@ public class AMQQueue implements Managable, Comparable setExclusive(true); } - debug("Registering protocol session {0} with channel {1} and consumer tag {2} with {3}", ps, channel, consumerTag, this); + if (_logger.isDebugEnabled()) + { + _logger.debug(MessageFormat.format("Registering protocol session {0} with channel {1} and " + + "consumer tag {2} with {3}", ps, channel, consumerTag, this)); + } Subscription subscription = _subscriptionFactory.createSubscription(channel, ps, consumerTag, acks, filters, noLocal, this); @@ -486,8 +491,11 @@ public class AMQQueue implements Managable, Comparable public void unregisterProtocolSession(AMQProtocolSession ps, int channel, AMQShortString consumerTag) throws AMQException { - debug("Unregistering protocol session {0} with channel {1} and consumer tag {2} from {3}", ps, channel, consumerTag, - this); + if (_logger.isDebugEnabled()) + { + _logger.debug(MessageFormat.format("Unregistering protocol session {0} with channel {1} and consumer tag {2} from {3}", ps, channel, consumerTag, + this)); + } Subscription removedSubscription; if ((removedSubscription = _subscribers.removeSubscriber(_subscriptionFactory.createSubscription(channel, @@ -506,6 +514,10 @@ public class AMQQueue implements Managable, Comparable // if we are eligible for auto deletion, unregister from the queue registry if (_autoDelete && _subscribers.isEmpty()) { + if (_logger.isInfoEnabled()) + { + _logger.warn("Auto-deleteing queue:" + this); + } autodelete(); // we need to manually fire the event to the removed subscription (which was the last one left for this // queue. This is because the delete method uses the subscription set which has just been cleared @@ -561,14 +573,18 @@ public class AMQQueue implements Managable, Comparable protected void autodelete() throws AMQException { - debug("autodeleting {0}", this); + if (_logger.isDebugEnabled()) + { + _logger.debug(MessageFormat.format("autodeleting {0}", this)); + } delete(); } - public void processGet(StoreContext storeContext, AMQMessage msg) throws AMQException + public void processGet(StoreContext storeContext, AMQMessage msg, boolean deliverFirst) throws AMQException { //fixme not sure what this is doing. should we be passing deliverFirst through here? - _deliveryMgr.deliver(storeContext, getName(), msg, false); + // This code is not used so when it is perhaps it should + _deliveryMgr.deliver(storeContext, getName(), msg, deliverFirst); try { msg.checkDeliveredToConsumer(); @@ -582,6 +598,10 @@ public class AMQQueue implements Managable, Comparable } } +// public DeliveryManager getDeliveryManager() +// { +// return _deliveryMgr; +// } public void process(StoreContext storeContext, AMQMessage msg, boolean deliverFirst) throws AMQException { @@ -673,14 +693,6 @@ public class AMQQueue implements Managable, Comparable return "Queue(" + _name + ")@" + System.identityHashCode(this); } - private void debug(String msg, Object... args) - { - if (_logger.isDebugEnabled()) - { - _logger.debug(MessageFormat.format(msg, args)); - } - } - public boolean performGet(AMQProtocolSession session, AMQChannel channel, boolean acks) throws AMQException { return _deliveryMgr.performGet(session, channel, acks); diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java index e70926736d..601effcec7 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java @@ -45,7 +45,6 @@ import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.util.MessageQueue; import org.apache.qpid.util.ConcurrentLinkedMessageQueueAtomicSize; -import org.apache.qpid.util.ConcurrentLinkedQueueAtomicSize; /** Manages delivery of messages on behalf of a queue */ @@ -86,6 +85,8 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager private AtomicLong _totalMessageSize = new AtomicLong(); private AtomicInteger _extraMessages = new AtomicInteger(); private Set<Subscription> _hasContent = Collections.synchronizedSet(new HashSet<Subscription>()); + private final Object _queueHeadLock = new Object(); + private String _processingThreadName = ""; ConcurrentSelectorDeliveryManager(SubscriptionManager subscriptions, AMQQueue queue) { @@ -118,7 +119,10 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager if (deliverFirst) { - _messages.pushHead(msg); + synchronized (_queueHeadLock) + { + _messages.pushHead(msg); + } } else { @@ -367,16 +371,19 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager long count = 0; _lock.lock(); - AMQMessage msg = getNextMessage(); - while (msg != null) + synchronized (_queueHeadLock) { - //mark this message as taken and get it removed - msg.taken(null); - _queue.dequeue(storeContext, msg); - msg = getNextMessage(); - count++; - } + AMQMessage msg = getNextMessage(); + while (msg != null) + { + //and remove it + _messages.poll(); + _queue.dequeue(storeContext, msg); + msg = getNextMessage(); + count++; + } + } _lock.unlock(); return count; } @@ -390,12 +397,20 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager { AMQMessage message = messages.peek(); - - while (message != null && ((sub == null || sub.isBrowser()) || message.taken(sub))) + while (message != null && ((sub != null && sub.isBrowser()) || message.taken(sub))) { //remove the already taken message - messages.poll(); + AMQMessage removed = messages.poll(); + + assert removed == message; + _totalMessageSize.addAndGet(-message.getSize()); + + if (_log.isTraceEnabled()) + { + _log.trace("Removed taken message:" + message.debugIdentity()); + } + // try the next message message = messages.peek(); } @@ -409,7 +424,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager if (_log.isTraceEnabled()) { - _log.trace("Async sendNextMessage for sub (" + System.identityHashCode(sub) + + _log.trace(debugIdentity() + "Async sendNextMessage for sub (" + System.identityHashCode(sub) + ") from queue (" + System.identityHashCode(messageQueue) + ") AMQQueue (" + System.identityHashCode(queue) + ")"); } @@ -417,46 +432,63 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager if (messageQueue == null) { // There is no queue with messages currently. This is ok... just means the queue has no msgs matching selector - if (_log.isDebugEnabled()) + if (_log.isInfoEnabled()) { - _log.debug(sub + ": asked to send messages but has none on given queue:" + queue); + _log.info(debugIdentity() + sub + ": asked to send messages but has none on given queue:" + queue); } return; } AMQMessage message = null; + AMQMessage removed = null; try { - message = getNextMessage(messageQueue, sub); - - // message will be null if we have no messages in the messageQueue. - if (message == null) + synchronized (_queueHeadLock) { - if (_log.isTraceEnabled()) + message = getNextMessage(messageQueue, sub); + + // message will be null if we have no messages in the messageQueue. + if (message == null) { - _log.trace("No messages for Subscriber(" + System.identityHashCode(sub) + ") from queue; (" + System.identityHashCode(messageQueue) + ")"); + if (_log.isTraceEnabled()) + { + _log.trace(debugIdentity() + "No messages for Subscriber(" + System.identityHashCode(sub) + ") from queue; (" + System.identityHashCode(messageQueue) + ")"); + } + return; + } + if (_log.isDebugEnabled()) + { + _log.debug(debugIdentity() + "Async Delivery Message " + message.getMessageId() + "(" + System.identityHashCode(message) + + ") by :" + System.identityHashCode(this) + + ") to :" + System.identityHashCode(sub)); } - return; + + sub.send(message, _queue); + + //remove sent message from our queue. + removed = messageQueue.poll(); + //If we don't remove the message from _messages + // Otherwise the Async send will never end } + + if (removed != message) + { + _log.error("Just send message:" + message.debugIdentity() + " BUT removed this from queue:" + removed); + } + if (_log.isDebugEnabled()) { - _log.debug("Async Delivery Message (" + System.identityHashCode(message) + + _log.debug(debugIdentity() + "Async Delivered Message r:" + removed.debugIdentity() + "d:" + message.debugIdentity() + ") by :" + System.identityHashCode(this) + ") to :" + System.identityHashCode(sub)); } - sub.send(message, _queue); - - //remove sent message from our queue. - messageQueue.poll(); - //If we don't remove the message from _messages - // Otherwise the Async send will never end if (messageQueue == sub.getResendQueue()) { if (_log.isTraceEnabled()) { - _log.trace("All messages sent from resendQueue for " + sub); + _log.trace(debugIdentity() + "All messages sent from resendQueue for " + sub); } if (messageQueue.isEmpty()) { @@ -469,7 +501,10 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager } else if (messageQueue == sub.getPreDeliveryQueue()) { - _log.info("We could do clean up of the main _message queue here"); + if (_log.isInfoEnabled()) + { + _log.info(debugIdentity() + "We could do clean up of the main _message queue here"); + } } _totalMessageSize.addAndGet(-message.getSize()); @@ -477,7 +512,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager catch (AMQException e) { message.release(); - _log.error("Unable to deliver message as dequeue failed: " + e, e); + _log.error(debugIdentity() + "Unable to deliver message as dequeue failed: " + e, e); } } @@ -516,6 +551,12 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager */ private void processQueue() { + //record thread name + if (_log.isDebugEnabled()) + { + _processingThreadName = Thread.currentThread().getName(); + } + // Continue to process delivery while we haveSubscribers and messages boolean hasSubscribers = _subscriptions.hasActiveSubscribers(); @@ -561,9 +602,10 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager { if (_log.isDebugEnabled()) { - _log.debug(id() + "deliver :first(" + deliverFirst + ") :" + msg); + _log.debug(debugIdentity() + "deliver :first(" + deliverFirst + ") :" + msg); } - msg.release(); + // This shouldn't be done here. +// msg.release(); //Check if we have someone to deliver the message to. _lock.lock(); @@ -575,7 +617,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager { if (_log.isDebugEnabled()) { - _log.debug(id() + "Testing Message(" + msg + ") for Queued Delivery:" + currentStatus()); + _log.debug(debugIdentity() + "Testing Message(" + msg + ") for Queued Delivery:" + currentStatus()); } if (!msg.getMessagePublishInfo().isImmediate()) { @@ -587,7 +629,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager //Pre Deliver to all subscriptions if (_log.isDebugEnabled()) { - _log.debug(id() + "We have " + _subscriptions.getSubscriptions().size() + + _log.debug(debugIdentity() + "We have " + _subscriptions.getSubscriptions().size() + " subscribers to give the message to:" + currentStatus()); } for (Subscription sub : _subscriptions.getSubscriptions()) @@ -598,7 +640,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager { if (_log.isDebugEnabled()) { - _log.debug(id() + "Stopping PreDelivery as message(" + System.identityHashCode(msg) + + _log.debug(debugIdentity() + "Stopping PreDelivery as message(" + System.identityHashCode(msg) + ") is already delivered."); } continue; @@ -609,7 +651,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager { if (_log.isDebugEnabled()) { - _log.debug(id() + "Queuing message(" + System.identityHashCode(msg) + + _log.debug(debugIdentity() + "Queuing message(" + System.identityHashCode(msg) + ") for PreDelivery for subscriber(" + System.identityHashCode(sub) + ")"); } sub.enqueueForPreDelivery(msg, deliverFirst); @@ -625,9 +667,9 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager { if (!s.isSuspended()) { - if (_log.isDebugEnabled()) + if (_log.isTraceEnabled()) { - _log.debug(id() + "Delivering Message:" + System.identityHashCode(msg) + " to(" + + _log.trace(debugIdentity() + "Delivering Message:" + msg.debugIdentity() + " to(" + System.identityHashCode(s) + ") :" + s); } msg.taken(s); @@ -636,33 +678,35 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager } else { - if (_log.isDebugEnabled()) + if (_log.isInfoEnabled()) { - _log.debug(id() + " Subscription(" + System.identityHashCode(s) + ") became suspended between nextSubscriber and send"); + _log.info(debugIdentity() + " Subscription(" + System.identityHashCode(s) + ") became " + + "suspended between nextSubscriber and send for message:" + msg.debugIdentity()); } } + } - if (!msg.isTaken()) + if (!msg.isTaken()) + { + if (_log.isInfoEnabled()) { - if (_log.isDebugEnabled()) - { - _log.debug(id() + " Message(" + System.identityHashCode(msg) + ") has not been taken so recursing!:" + - " Subscriber:" + System.identityHashCode(s)); - } - - deliver(context, name, msg, deliverFirst); + _log.info(debugIdentity() + " Message(" + msg.debugIdentity() + ") has not been taken so recursing!:" + + " Subscriber:" + System.identityHashCode(s)); } - else + + deliver(context, name, msg, deliverFirst); + } + else + { + if (_log.isDebugEnabled()) { - if (_log.isDebugEnabled()) - { - _log.debug(id() + " Message(" + System.identityHashCode(msg) + - ") has been taken so disregarding deliver request to Subscriber:" + - System.identityHashCode(s)); - } + _log.debug(debugIdentity() + " Message(" + msg.debugIdentity() + + ") has been taken so disregarding deliver request to Subscriber:" + + System.identityHashCode(s)); } } } + } finally { @@ -674,10 +718,9 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager } } - //fixme remove private final String id = "(" + String.valueOf(System.identityHashCode(this)) + ")"; - private String id() + private String debugIdentity() { return id; } @@ -710,7 +753,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager { if (_log.isDebugEnabled()) { - _log.debug("Processing Async." + currentStatus()); + _log.debug(debugIdentity() + "Processing Async." + currentStatus()); } if (hasQueuedMessages() && _subscriptions.hasActiveSubscribers()) @@ -725,14 +768,12 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager private String currentStatus() { - return " Queued:" + (_messages.isEmpty() ? "Empty " : "Contains") + - "(" + _messages.size() + ":" + ((ConcurrentLinkedQueue) _messages).size() + ") " + + return " Queued:" + (_messages.isEmpty() ? "Empty " : "Contains(M:H)") + + "(" + _messages.size() + ":" + ((ConcurrentLinkedMessageQueueAtomicSize) _messages).headSize() + ") " + " Extra: " + (_hasContent.isEmpty() ? "Empty " : "Contains") + "(" + _hasContent.size() + ":" + _extraMessages.get() + ") " + " Active:" + _subscriptions.hasActiveSubscribers() + - " Processing:" + _processing.get() + - " Queued:" + (_messages.isEmpty() ? "Empty " : "Contains") + - "(" + _messages.size() + ":" + ((ConcurrentLinkedQueue) _messages).size() + ") "; + " Processing:" + (_processing.get() ? " true : Processing Thread: " + _processingThreadName : " false"); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java index 0a2e73880c..20033daac7 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java @@ -46,6 +46,8 @@ import org.apache.qpid.util.ConcurrentLinkedMessageQueueAtomicSize; */ public class SubscriptionImpl implements Subscription { + + private static final Logger _suspensionlogger = Logger.getLogger("Suspension"); private static final Logger _logger = Logger.getLogger(SubscriptionImpl.class); public final AMQChannel channel; @@ -258,6 +260,12 @@ public class SubscriptionImpl implements Subscription { channel.addUnacknowledgedBrowsedMessage(msg, deliveryTag, consumerTag, queue); } + + if (_sendLock.get()) + { + _logger.error("Sending " + msg + " when subscriber(" + this + ") is closed!"); + } + protocolSession.getProtocolOutputConverter().writeDeliver(msg, channel.getChannelId(), deliveryTag, consumerTag); } } @@ -265,56 +273,56 @@ public class SubscriptionImpl implements Subscription private void sendToConsumer(StoreContext storeContext, AMQMessage msg, AMQQueue queue) throws AMQException { - try - { - // if we do not need to wait for client acknowledgements - // we can decrement the reference count immediately. + // if we do not need to wait for client acknowledgements + // we can decrement the reference count immediately. - // By doing this _before_ the send we ensure that it - // doesn't get sent if it can't be dequeued, preventing - // duplicate delivery on recovery. + // By doing this _before_ the send we ensure that it + // doesn't get sent if it can't be dequeued, preventing + // duplicate delivery on recovery. - // The send may of course still fail, in which case, as - // the message is unacked, it will be lost. - if (!_acks) + // The send may of course still fail, in which case, as + // the message is unacked, it will be lost. + if (!_acks) + { + if (_logger.isDebugEnabled()) { - if (_logger.isDebugEnabled()) - { - _logger.debug("No ack mode so dequeuing message immediately: " + msg.getMessageId()); - } - queue.dequeue(storeContext, msg); + _logger.debug("No ack mode so dequeuing message immediately: " + msg.getMessageId()); } - synchronized (channel) - { - long deliveryTag = channel.getNextDeliveryTag(); - - if (_acks) - { - channel.addUnacknowledgedMessage(msg, deliveryTag, consumerTag, queue); - msg.decrementReference(storeContext); - } + queue.dequeue(storeContext, msg); + } + synchronized (channel) + { + long deliveryTag = channel.getNextDeliveryTag(); - protocolSession.getProtocolOutputConverter().writeDeliver(msg, channel.getChannelId(), deliveryTag, consumerTag); + if (_sendLock.get()) + { + _logger.error("Sending " + msg + " when subscriber(" + this + ") is closed!"); + } + if (_acks) + { + channel.addUnacknowledgedMessage(msg, deliveryTag, consumerTag, queue); + msg.decrementReference(storeContext); } - } - finally - { + + protocolSession.getProtocolOutputConverter().writeDeliver(msg, channel.getChannelId(), deliveryTag, consumerTag); + //Only set delivered if it actually was writen successfully.. + // using a try->finally would set it even if an error occured. msg.setDeliveredToConsumer(); } } public boolean isSuspended() { - if (_logger.isTraceEnabled()) + if (_suspensionlogger.isInfoEnabled()) { if (channel.isSuspended()) { - _logger.trace("Subscription(" + System.identityHashCode(this) + ") channel's is susupended"); + _suspensionlogger.info("Subscription(" + debugIdentity() + ") channel's is susupended"); } if (_sendLock.get()) { - _logger.trace("Subscription(" + System.identityHashCode(this) + ") has sendLock set so closing."); + _suspensionlogger.info("Subscription(" + debugIdentity() + ") has sendLock set so closing."); } } return channel.isSuspended() || _sendLock.get(); @@ -323,7 +331,7 @@ public class SubscriptionImpl implements Subscription /** * Callback indicating that a queue has been deleted. * - * @param queue + * @param queue The queue to delete */ public void queueDeleted(AMQQueue queue) throws AMQException { @@ -337,9 +345,18 @@ public class SubscriptionImpl implements Subscription public boolean hasInterest(AMQMessage msg) { + //check that the message hasn't been rejected + if (msg.isRejectedBy(this)) + { + if (_logger.isDebugEnabled()) + { + _logger.debug("Subscription:" + debugIdentity() + " rejected message:" + msg.debugIdentity()); + } +// return false; + } + if (_noLocal) { - boolean isLocal; // We don't want local messages so check to see if message is one we sent Object localInstance; Object msgInstance; @@ -350,12 +367,12 @@ public class SubscriptionImpl implements Subscription if ((msg.getPublisher().getClientProperties() != null) && (msgInstance = msg.getPublisher().getClientProperties().getObject(CLIENT_PROPERTIES_INSTANCE)) != null) { - if (localInstance == msgInstance || ((localInstance != null) && localInstance.equals(msgInstance))) + if (localInstance == msgInstance || localInstance.equals(msgInstance)) { if (_logger.isTraceEnabled()) { - _logger.trace("(" + System.identityHashCode(this) + ") has no interest as it is a local message(" + - System.identityHashCode(msg) + ")"); + _logger.trace("(" + debugIdentity() + ") has no interest as it is a local message(" + + msg.debugIdentity() + ")"); } return false; } @@ -369,8 +386,8 @@ public class SubscriptionImpl implements Subscription { if (_logger.isTraceEnabled()) { - _logger.trace("(" + System.identityHashCode(this) + ") has no interest as it is a local message(" + - System.identityHashCode(msg) + ")"); + _logger.trace("(" + debugIdentity() + ") has no interest as it is a local message(" + + msg.debugIdentity() + ")"); } return false; } @@ -383,19 +400,26 @@ public class SubscriptionImpl implements Subscription if (_logger.isTraceEnabled()) { - _logger.trace("(" + System.identityHashCode(this) + ") checking filters for message (" + System.identityHashCode(msg)); + _logger.trace("(" + debugIdentity() + ") checking filters for message (" + msg.debugIdentity()); } return checkFilters(msg); } + private String id = String.valueOf(System.identityHashCode(this)); + + private String debugIdentity() + { + return id; + } + private boolean checkFilters(AMQMessage msg) { if (_filters != null) { if (_logger.isTraceEnabled()) { - _logger.trace("(" + System.identityHashCode(this) + ") has filters."); + _logger.trace("(" + debugIdentity() + ") has filters."); } return _filters.allAllow(msg); } @@ -403,7 +427,7 @@ public class SubscriptionImpl implements Subscription { if (_logger.isTraceEnabled()) { - _logger.trace("(" + System.identityHashCode(this) + ") has no filters"); + _logger.trace("(" + debugIdentity() + ") has no filters"); } return true; @@ -445,15 +469,19 @@ public class SubscriptionImpl implements Subscription } _sendLock.set(true); - } + if (_logger.isInfoEnabled()) { - _logger.info("Closing subscription (" + System.identityHashCode(this) + "):" + this); + _logger.info("Closing subscription (" + debugIdentity() + "):" + this); } if (_resendQueue != null && !_resendQueue.isEmpty()) { + if (_logger.isInfoEnabled()) + { + _logger.info("Requeuing closing subscription (" + debugIdentity() + "):" + this); + } requeue(); } @@ -486,6 +514,11 @@ public class SubscriptionImpl implements Subscription { AMQMessage resent = _resendQueue.poll(); + if (_logger.isTraceEnabled()) + { + _logger.trace("Removed for resending:" + resent.debugIdentity()); + } + resent.release(); _queue.subscriberHasPendingResend(false, this, resent); @@ -495,7 +528,7 @@ public class SubscriptionImpl implements Subscription } catch (AMQException e) { - _logger.error("Unable to re-deliver messages", e); + _logger.error("MESSAGE LOSS : Unable to re-deliver messages", e); } } diff --git a/java/client/pom.xml b/java/client/pom.xml index 617390c059..abac5b3f1a 100644 --- a/java/client/pom.xml +++ b/java/client/pom.xml @@ -55,6 +55,13 @@ <dependency> <groupId>commons-collections</groupId> <artifactId>commons-collections</artifactId> + <!-- commons collection exports log4j v1.2.7 which doesn't have trace()--> + <exclusions> + <exclusion> + <groupId>log4j</groupId> + <artifactId>log4j</artifactId> + </exclusion> + </exclusions> </dependency> <dependency> diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 89f596e541..61143eee69 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -198,9 +198,10 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi private final Object _suspensionLock = new Object(); - /** Responsible for decoding a message fragment and passing it to the appropriate message consumer. */ + private static final Logger _dispatcherLogger = Logger.getLogger(Dispatcher.class); + private class Dispatcher extends Thread { @@ -212,12 +213,37 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public Dispatcher() { super("Dispatcher-Channel-" + _channelId); + if (_dispatcherLogger.isInfoEnabled()) + { + _dispatcherLogger.info(getName() + " created"); + } } public void run() { + if (_dispatcherLogger.isInfoEnabled()) + { + _dispatcherLogger.info(getName() + " started"); + } + UnprocessedMessage message; + // Allow disptacher to start stopped + synchronized (_lock) + { + while (connectionStopped()) + { + try + { + _lock.wait(); + } + catch (InterruptedException e) + { + // ignore + } + } + } + try { while (!_closed.get() && (message = (UnprocessedMessage) _queue.take()) != null) @@ -243,10 +269,12 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } catch (InterruptedException e) { - ; + //ignore + } + if (_dispatcherLogger.isInfoEnabled()) + { + _dispatcherLogger.info(getName() + " thread terminating for channel " + _channelId); } - - _logger.info("Dispatcher thread terminating for channel " + _channelId); } // only call while holding lock @@ -263,6 +291,12 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi currently = _connectionStopped; _connectionStopped = connectionStopped; _lock.notify(); + + if (_dispatcherLogger.isDebugEnabled()) + { + _dispatcherLogger.debug("Dispatcher Connection " + (connectionStopped ? "Started" : "Stopped") + + ": Currently " + (currently ? "Started" : "Stopped")); + } } return currently; } @@ -275,9 +309,14 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi if (consumer == null) { - _logger.warn("Received a message from queue " + message.getDeliverBody().consumerTag + " without a handler - ignoring..."); - _logger.warn("Consumers that exist: " + _consumers); - _logger.warn("Session hashcode: " + System.identityHashCode(this)); + if (_dispatcherLogger.isInfoEnabled()) + { + _dispatcherLogger.info("Received a message(" + System.identityHashCode(message) + ")" + + "[" + message.getDeliverBody().deliveryTag + "] from queue " + + message.getDeliverBody().consumerTag + " without a handler - rejecting(requeue)..."); + } + + rejectMessage(message, true); } else { @@ -311,7 +350,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi rejectAllMessages(true); - _logger.debug("Session Pre Dispatch Queue cleared"); + _dispatcherLogger.debug("Session Pre Dispatch Queue cleared"); for (BasicMessageConsumer consumer : _consumers.values()) { @@ -323,20 +362,28 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } - public void rejectPending(AMQShortString consumerTag) + public void rejectPending(BasicMessageConsumer consumer) { synchronized (_lock) { - boolean stopped = connectionStopped(); + boolean stopped = _dispatcher.connectionStopped(); - _dispatcher.setConnectionStopped(false); - - rejectMessagesForConsumerTag(consumerTag, true); - - if (stopped) + if (!stopped) { - _dispatcher.setConnectionStopped(stopped); + _dispatcher.setConnectionStopped(true); } + + // Reject messages on pre-receive queue + consumer.rollback(); + + // Reject messages on pre-dispatch queue + rejectMessagesForConsumerTag(consumer.getConsumerTag(), true); + + // Remove consumer from map. + deregisterConsumer(consumer); + + _dispatcher.setConnectionStopped(stopped); + } } } @@ -549,14 +596,15 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi suspendChannel(true); } - _connection.getProtocolHandler().syncWrite( - TxRollbackBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion()), TxRollbackOkBody.class); - if (_dispatcher != null) { _dispatcher.rollback(); } + _connection.getProtocolHandler().syncWrite( + TxRollbackBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion()), TxRollbackOkBody.class); + + if (!isSuspended) { suspendChannel(false); @@ -663,14 +711,10 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi jmse = e; } } - finally + if (jmse != null) { - if (jmse != null) - { - throw jmse; - } + throw jmse; } - } @@ -835,6 +879,11 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi consumer.clearUnackedMessages(); } + if (_dispatcher != null) + { + _dispatcher.rollback(); + } + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. @@ -844,11 +893,6 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi false) // requeue , BasicRecoverOkBody.class); - if (_dispatcher != null) - { - _dispatcher.rollback(); - } - if (!isSuspended) { suspendChannel(false); @@ -1223,35 +1267,17 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi return (counter != null) && (counter.get() != 0); } - - public void declareExchange(AMQShortString name, AMQShortString type) throws AMQException + public void declareExchange(AMQShortString name, AMQShortString type, boolean nowait) throws AMQException { - declareExchange(name, type, getProtocolHandler()); + declareExchange(name, type, getProtocolHandler(), nowait); } - public void declareExchangeSynch(AMQShortString name, AMQShortString type) throws AMQException + private void declareExchange(AMQDestination amqd, AMQProtocolHandler protocolHandler, boolean nowait) throws AMQException { - // TODO: Be aware of possible changes to parameter order as versions change. - AMQFrame frame = ExchangeDeclareBody.createAMQFrame(_channelId, - getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor) - null, // arguments - false, // autoDelete - false, // durable - name, // exchange - false, // internal - false, // nowait - false, // passive - getTicket(), // ticket - type); // type - getProtocolHandler().syncWrite(frame, ExchangeDeclareOkBody.class); + declareExchange(amqd.getExchangeName(), amqd.getExchangeClass(), protocolHandler, nowait); } - private void declareExchange(AMQDestination amqd, AMQProtocolHandler protocolHandler) throws AMQException - { - declareExchange(amqd.getExchangeName(), amqd.getExchangeClass(), protocolHandler); - } - - private void declareExchange(AMQShortString name, AMQShortString type, AMQProtocolHandler protocolHandler) throws AMQException + private void declareExchange(AMQShortString name, AMQShortString type, AMQProtocolHandler protocolHandler, boolean nowait) throws AMQException { // TODO: Be aware of possible changes to parameter order as versions change. AMQFrame exchangeDeclare = ExchangeDeclareBody.createAMQFrame(_channelId, @@ -1261,7 +1287,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi false, // durable name, // exchange false, // internal - false, // nowait + nowait, // nowait false, // passive getTicket(), // ticket type); // type @@ -1874,15 +1900,21 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi synchronized void startDistpatcherIfNecessary() { + startDistpatcherIfNecessary(false); + } + + synchronized void startDistpatcherIfNecessary(boolean initiallyStopped) + { if (_dispatcher == null) { _dispatcher = new Dispatcher(); _dispatcher.setDaemon(true); + _dispatcher.setConnectionStopped(initiallyStopped); _dispatcher.start(); } else { - _dispatcher.setConnectionStopped(false); + _dispatcher.setConnectionStopped(initiallyStopped); } } @@ -1910,7 +1942,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi AMQProtocolHandler protocolHandler = getProtocolHandler(); - declareExchange(amqd, protocolHandler); + declareExchange(amqd, protocolHandler, false); AMQShortString queueName = declareQueue(amqd, protocolHandler); @@ -1950,12 +1982,6 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi _destinationConsumerCount.remove(dest); } } - - //ensure we remove the messages from the consumer even if the dispatcher hasn't started - if (_dispatcher == null) - { - rejectMessagesForConsumerTag(consumer.getConsumerTag(), true); - }// if the dispatcher is running we have to do the clean up in the Ok Handler. } } @@ -2033,6 +2059,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public void confirmConsumerCancelled(AMQShortString consumerTag) { + + // Remove the consumer from the map BasicMessageConsumer consumer = (BasicMessageConsumer) _consumers.get(consumerTag); if (consumer != null) { @@ -2040,26 +2068,33 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi { consumer.closeWhenNoMessages(true); } + + //Clean the Maps up first + //Flush any pending messages for this consumerTag + if (_dispatcher != null) + { + _logger.info("Dispatcher is not null"); + } else { - consumer.rollback(); + _logger.info("Dispatcher is null so created stopped dispatcher"); + + startDistpatcherIfNecessary(true); } - } - //Flush any pending messages for this consumerTag - if (_dispatcher != null) - { - _dispatcher.rejectPending(consumerTag); + _dispatcher.rejectPending(consumer); } else { - rejectMessagesForConsumerTag(consumerTag, true); + _logger.warn("Unable to confirm cancellation of consumer (" + consumerTag + "). Not found in consumer map."); } + + } /* - * I could have combined the last 3 methods, but this way it improves readability - */ + * I could have combined the last 3 methods, but this way it improves readability + */ private AMQTopic checkValidTopic(Topic topic) throws JMSException { if (topic == null) @@ -2189,16 +2224,20 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi if (consumerTag == null || message.getDeliverBody().consumerTag.equals(consumerTag)) { - if (_logger.isTraceEnabled()) + if (_logger.isDebugEnabled()) { - _logger.trace("Removing message from _queue:" + message); + _logger.debug("Removing message(" + System.identityHashCode(message) + + ") from _queue DT:" + message.getDeliverBody().deliveryTag); } messages.remove(); - rejectMessage(message.getDeliverBody().deliveryTag, requeue); + rejectMessage(message, requeue); - _logger.debug("Rejected the message(" + message.getDeliverBody() + ") for consumer :" + consumerTag); + if (_logger.isDebugEnabled()) + { + _logger.debug("Rejected the message(" + message.getDeliverBody() + ") for consumer :" + consumerTag); + } } else { @@ -2207,15 +2246,45 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } } + + public void rejectMessage(UnprocessedMessage message, boolean requeue) + { + + if (_logger.isDebugEnabled()) + { + _logger.debug("Rejecting Unacked message:" + message.getDeliverBody().deliveryTag); + } + + rejectMessage(message.getDeliverBody().deliveryTag, requeue); + } + + public void rejectMessage(AbstractJMSMessage message, boolean requeue) + { + if (_logger.isDebugEnabled()) + { + _logger.debug("Rejecting Abstract message:" + message.getDeliveryTag()); + } + rejectMessage(message.getDeliveryTag(), requeue); + + } + public void rejectMessage(long deliveryTag, boolean requeue) { - AMQFrame basicRejectBody = BasicRejectBody.createAMQFrame(_channelId, - getProtocolMajorVersion(), - getProtocolMinorVersion(), - deliveryTag, - requeue); + if (_acknowledgeMode == CLIENT_ACKNOWLEDGE || + _acknowledgeMode == SESSION_TRANSACTED) + { + if (_logger.isDebugEnabled()) + { + _logger.debug("Rejecting delivery tag:" + deliveryTag); + } + AMQFrame basicRejectBody = BasicRejectBody.createAMQFrame(_channelId, + getProtocolMajorVersion(), + getProtocolMinorVersion(), + deliveryTag, + requeue); - _connection.getProtocolHandler().writeFrame(basicRejectBody); + _connection.getProtocolHandler().writeFrame(basicRejectBody); + } } } diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index e9b914425a..9043faa80c 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -21,6 +21,7 @@ package org.apache.qpid.client; import java.util.Iterator; +import java.util.List; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.TimeUnit; @@ -109,9 +110,6 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer /** Number of messages unacknowledged in DUPS_OK_ACKNOWLEDGE mode */ private int _outstanding; - /** Tag of last message delievered, whoch should be acknowledged on commit in transaction mode. */ - private long _lastDeliveryTag; - /** * Switch to enable sending of acknowledgements when using DUPS_OK_ACKNOWLEDGE mode. Enabled when _outstannding * number of msgs >= _prefetchHigh and disabled at < _prefetchLow @@ -120,6 +118,9 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer private ConcurrentLinkedQueue<Long> _unacknowledgedDeliveryTags = new ConcurrentLinkedQueue<Long>(); + /** List of tags delievered, The last of which which should be acknowledged on commit in transaction mode. */ + private ConcurrentLinkedQueue<Long> _receivedDeliveryTags = new ConcurrentLinkedQueue<Long>(); + /** * The thread that was used to call receive(). This is important for being able to interrupt that thread if a * receive() is in progress. @@ -432,6 +433,11 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer public void close(boolean sendClose) throws JMSException { + if (_logger.isInfoEnabled()) + { + _logger.info("Closing consumer:" + debugIdentity()); + } + synchronized (_connection.getFailoverMutex()) { if (!_closed.getAndSet(true)) @@ -448,6 +454,12 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer try { _protocolHandler.syncWrite(cancelFrame, BasicCancelOkBody.class); + + if (_logger.isDebugEnabled()) + { + _logger.debug("CancelOk'd for consumer:" + debugIdentity()); + } + } catch (AMQException e) { @@ -456,11 +468,14 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer } } - deregisterConsumer(); - _unacknowledgedDeliveryTags.clear(); + //done in BasicCancelOK Handler + //deregisterConsumer(); if (_messageListener != null && _receiving.get()) { - _logger.info("Interrupting thread: " + _receivingThread); + if (_logger.isInfoEnabled()) + { + _logger.info("Interrupting thread: " + _receivingThread); + } _receivingThread.interrupt(); } } @@ -616,7 +631,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer } else { - _lastDeliveryTag = msg.getDeliveryTag(); + _receivedDeliveryTags.add(msg.getDeliveryTag()); } break; } @@ -625,10 +640,16 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer /** Acknowledge up to last message delivered (if any). Used when commiting. */ void acknowledgeLastDelivered() { - if (_lastDeliveryTag > 0) + if (!_receivedDeliveryTags.isEmpty()) { - _session.acknowledgeMessage(_lastDeliveryTag, true); - _lastDeliveryTag = -1; + long lastDeliveryTag = _receivedDeliveryTags.poll(); + + while (!_receivedDeliveryTags.isEmpty()) + { + lastDeliveryTag = _receivedDeliveryTags.poll(); + } + + _session.acknowledgeMessage(lastDeliveryTag, true); } } @@ -738,43 +759,76 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer public void rollback() { + clearUnackedMessages(); + + if (_logger.isDebugEnabled()) + { + _logger.debug("Rejecting received messages"); + } + //rollback received but not committed messages + while (!_receivedDeliveryTags.isEmpty()) + { + Long tag = _receivedDeliveryTags.poll(); + + if (tag != null) + { + if (_logger.isTraceEnabled()) + { + _logger.trace("Rejecting tag from _receivedDTs:" + tag); + } + + _session.rejectMessage(tag, true); + } + } + + //rollback pending messages if (_synchronousQueue.size() > 0) { if (_logger.isDebugEnabled()) { - _logger.debug("Rejecting the messages for consumer with tag:" + _consumerTag); + _logger.debug("Rejecting the messages(" + _synchronousQueue.size() + ")" + + "for consumer with tag:" + _consumerTag); } Iterator iterator = _synchronousQueue.iterator(); + while (iterator.hasNext()) { - Object o = iterator.next(); + Object o = iterator.next(); if (o instanceof AbstractJMSMessage) { - _session.rejectMessage(((AbstractJMSMessage) o).getDeliveryTag(), true); + _session.rejectMessage(((AbstractJMSMessage) o), true); if (_logger.isTraceEnabled()) { - _logger.trace("Rejected message" + o); - iterator.remove(); + _logger.trace("Rejected message:" + ((AbstractJMSMessage) o).getDeliveryTag()); } + iterator.remove(); } else { _logger.error("Queue contained a :" + o.getClass() + " unable to reject as it is not an AbstractJMSMessage. Will be cleared"); + iterator.remove(); } } if (_synchronousQueue.size() != 0) { _logger.warn("Queue was not empty after rejecting all messages Remaining:" + _synchronousQueue.size()); + rollback(); } _synchronousQueue.clear(); } } + + public String debugIdentity() + { + return String.valueOf(_consumerTag); + } + } diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/BasicCancelOkMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/BasicCancelOkMethodHandler.java index 9bd0205977..bd8177feb6 100644 --- a/java/client/src/main/java/org/apache/qpid/client/handler/BasicCancelOkMethodHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/handler/BasicCancelOkMethodHandler.java @@ -28,27 +28,29 @@ import org.apache.qpid.client.state.StateAwareMethodListener; import org.apache.qpid.framing.BasicCancelOkBody; import org.apache.qpid.protocol.AMQMethodEvent; -/** - * @author Apache Software Foundation - */ public class BasicCancelOkMethodHandler implements StateAwareMethodListener { - private static final Logger _logger = Logger.getLogger(BasicCancelOkMethodHandler.class); - private static final BasicCancelOkMethodHandler _instance = new BasicCancelOkMethodHandler(); + private static final Logger _logger = Logger.getLogger(BasicCancelOkMethodHandler.class); + private static final BasicCancelOkMethodHandler _instance = new BasicCancelOkMethodHandler(); + + public static BasicCancelOkMethodHandler getInstance() + { + return _instance; + } + + private BasicCancelOkMethodHandler() + { + } - public static BasicCancelOkMethodHandler getInstance() - { - return _instance; - } + public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt) throws AMQException + { + BasicCancelOkBody body = (BasicCancelOkBody) evt.getMethod(); - private BasicCancelOkMethodHandler() - { - } + if (_logger.isInfoEnabled()) + { + _logger.info("New BasicCancelOk method received for consumer:" + body.consumerTag); + } - public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt) throws AMQException - { - _logger.debug("New BasicCancelOk method received"); - BasicCancelOkBody body = (BasicCancelOkBody) evt.getMethod(); - protocolSession.confirmConsumerCancelled(evt.getChannelId(), body.consumerTag); - } + protocolSession.confirmConsumerCancelled(evt.getChannelId(), body.consumerTag); + } } diff --git a/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java index ddf79ec907..b176df87fe 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java @@ -30,13 +30,11 @@ import org.apache.qpid.framing.ContentBody; import org.apache.qpid.framing.ContentHeaderBody; /** - * This class contains everything needed to process a JMS message. It assembles the - * deliver body, the content header and the content body/ies. - * - * Note that the actual work of creating a JMS message for the client code's use is done - * outside of the MINA dispatcher thread in order to minimise the amount of work done in - * the MINA dispatcher thread. + * This class contains everything needed to process a JMS message. It assembles the deliver body, the content header and + * the content body/ies. * + * Note that the actual work of creating a JMS message for the client code's use is done outside of the MINA dispatcher + * thread in order to minimise the amount of work done in the MINA dispatcher thread. */ public class UnprocessedMessage { @@ -47,9 +45,7 @@ public class UnprocessedMessage private final int _channelId; private ContentHeaderBody _contentHeader; - /** - * List of ContentBody instances. Due to fragmentation you don't know how big this will be in general - */ + /** List of ContentBody instances. Due to fragmentation you don't know how big this will be in general */ private List<ContentBody> _bodies; public UnprocessedMessage(int channelId, BasicDeliverBody deliverBody) @@ -74,9 +70,9 @@ public class UnprocessedMessage { final long payloadSize = body.payload.remaining(); - if(_bodies == null) + if (_bodies == null) { - if(payloadSize == getContentHeader().bodySize) + if (payloadSize == getContentHeader().bodySize) { _bodies = Collections.singletonList(body); } diff --git a/java/client/src/main/java/org/apache/qpid/client/state/StateWaiter.java b/java/client/src/main/java/org/apache/qpid/client/state/StateWaiter.java index b2940d73ae..8a0b5e7d84 100644 --- a/java/client/src/main/java/org/apache/qpid/client/state/StateWaiter.java +++ b/java/client/src/main/java/org/apache/qpid/client/state/StateWaiter.java @@ -58,6 +58,7 @@ public class StateWaiter implements StateListener { _logger.debug("State " + _state + " not achieved so waiting..."); _monitor.wait(TIME_OUT); + //fixme this won't cause the timeout to exit the loop. need to set _throwable } catch (InterruptedException e) { diff --git a/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java b/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java index 03e7d399ce..cb4ef01d25 100644 --- a/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java +++ b/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java @@ -20,9 +20,6 @@ */ package org.apache.qpid.client.util; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.client.message.UnprocessedMessage; -import org.apache.log4j.Logger; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java index 338404a431..4667a2b3fa 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java @@ -73,7 +73,8 @@ public class RecoverTest extends TestCase Queue queue = new AMQQueue(consumerSession.getDefaultQueueExchangeName(),new AMQShortString("someQ"), new AMQShortString("someQ"), false, true); MessageConsumer consumer = consumerSession.createConsumer(queue); //force synch to ensure the consumer has resulted in a bound queue - ((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.DIRECT_EXCHANGE_NAME, ExchangeDefaults.DIRECT_EXCHANGE_CLASS); + //((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.DIRECT_EXCHANGE_NAME, ExchangeDefaults.DIRECT_EXCHANGE_CLASS); + // This is the default now AMQConnection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "test"); Session producerSession = con2.createSession(false, Session.CLIENT_ACKNOWLEDGE); @@ -130,7 +131,8 @@ public class RecoverTest extends TestCase Queue queue = new AMQQueue(consumerSession.getDefaultQueueExchangeName(), new AMQShortString("someQ"), new AMQShortString("someQ"), false, true); MessageConsumer consumer = consumerSession.createConsumer(queue); //force synch to ensure the consumer has resulted in a bound queue - ((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.DIRECT_EXCHANGE_NAME, ExchangeDefaults.DIRECT_EXCHANGE_CLASS); + //((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.DIRECT_EXCHANGE_NAME, ExchangeDefaults.DIRECT_EXCHANGE_CLASS); + // This is the default now AMQConnection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "test"); Session producerSession = con2.createSession(false, Session.CLIENT_ACKNOWLEDGE); diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseTest.java index 3431c56783..51bbe7d0e6 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseTest.java @@ -109,6 +109,10 @@ public class ChannelCloseTest extends TestCase implements ExceptionListener, Con } catch (AMQException e) { + if (_logger.isInfoEnabled()) + { + _logger.info("Exception occured was:" + e.getErrorCode()); + } assertEquals("Connection should be closed", AMQConstant.CHANNEL_ERROR, e.getErrorCode()); _connection = newConnection(); @@ -315,15 +319,15 @@ public class ChannelCloseTest extends TestCase implements ExceptionListener, Con } catch (JMSException e) { - fail("Creating new connection when:"+e.getMessage()); + fail("Creating new connection when:" + e.getMessage()); } catch (AMQException e) { - fail("Creating new connection when:"+e.getMessage()); + fail("Creating new connection when:" + e.getMessage()); } catch (URLSyntaxException e) { - fail("Creating new connection when:"+e.getMessage()); + fail("Creating new connection when:" + e.getMessage()); } diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java new file mode 100644 index 0000000000..a56bae3d70 --- /dev/null +++ b/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java @@ -0,0 +1,603 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.test.unit.close; + +import junit.framework.TestCase; + +import java.util.concurrent.atomic.AtomicInteger; + + +import javax.jms.ExceptionListener; +import javax.jms.Session; +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.Queue; +import javax.jms.MessageProducer; +import javax.jms.Message; +import javax.jms.TextMessage; +import javax.jms.MessageConsumer; + +import org.apache.qpid.client.AMQConnectionFactory; +import org.apache.qpid.client.AMQConnectionURL; +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.message.AbstractJMSMessage; +import org.apache.qpid.client.transport.TransportConnection; +import org.apache.qpid.url.URLSyntaxException; +import org.apache.qpid.AMQException; +import org.apache.log4j.Logger; +import org.apache.log4j.Level; + +public class MessageRequeueTest extends TestCase +{ + + private static final Logger _logger = Logger.getLogger(MessageRequeueTest.class); + + protected static AtomicInteger consumerIds = new AtomicInteger(0); + protected final Integer numTestMessages = 150; + + protected final int consumeTimeout = 3000; + + protected final String queue = "direct://amq.direct//queue"; + protected String payload = "Message:"; + + protected final String BROKER = "vm://:1"; + private boolean testReception = true; + + private long[] receieved = new long[numTestMessages + 1]; + private boolean passed=false; + + protected void setUp() throws Exception + { + super.setUp(); + TransportConnection.createVMBroker(1); + + QpidClientConnection conn = new QpidClientConnection(); + + conn.connect(); + // clear queue + conn.consume(queue, consumeTimeout); + // load test data + _logger.info("creating test data, " + numTestMessages + " messages"); + conn.put(queue, payload, numTestMessages); + // close this connection + conn.disconnect(); + } + + protected void tearDown() throws Exception + { + super.tearDown(); + + if (!passed) + { + QpidClientConnection conn = new QpidClientConnection(); + + conn.connect(); + // clear queue + conn.consume(queue, consumeTimeout); + } + TransportConnection.killVMBroker(1); + } + + /** multiple consumers */ + public void testDrain() throws JMSException, InterruptedException + { + QpidClientConnection conn = new QpidClientConnection(); + + conn.connect(); + + _logger.info("consuming queue " + queue); + Queue q = conn.getSession().createQueue(queue); + + final MessageConsumer consumer = conn.getSession().createConsumer(q); + int messagesReceived = 0; + + long messageLog[] = new long[numTestMessages + 1]; + + _logger.info("consuming..."); + Message msg = consumer.receive(1000); + while (msg != null) + { + messagesReceived++; + + long dt = ((AbstractJMSMessage) msg).getDeliveryTag(); + + int msgindex = msg.getIntProperty("index"); + if (messageLog[msgindex] != 0) + { + _logger.error("Received Message(" + msgindex + ":" + ((AbstractJMSMessage) msg).getDeliveryTag() + + ") more than once."); + } + + if (_logger.isInfoEnabled()) + { + _logger.info("Received Message(" + System.identityHashCode(msgindex) + ") " + + "DT:" + dt + + "IN:" + msgindex); + } + + if (dt == 0) + { + _logger.error("DT is zero for msg:" + msgindex); + } + + messageLog[msgindex] = dt; + + //get Next message + msg = consumer.receive(1000); + } + + conn.getSession().commit(); + consumer.close(); + assertEquals("number of consumed messages does not match initial data", (int) numTestMessages, messagesReceived); + + int index = 0; + StringBuilder list = new StringBuilder(); + list.append("Failed to receive:"); + int failed = 0; + + for (long b : messageLog) + { + if (b == 0 && index != 0) //delivery tag of zero shouldn't exist + { + _logger.error("Index: " + index + " was not received."); + list.append(" "); + list.append(index); + list.append(":"); + list.append(b); + failed++; + } + + index++; + } + assertEquals(list.toString(), 0, failed); + _logger.info("consumed: " + messagesReceived); + conn.disconnect(); + } + + /** multiple consumers */ + public void testTwoCompetingConsumers() + { + Consumer c1 = new Consumer(); + Consumer c2 = new Consumer(); + Consumer c3 = new Consumer(); + Consumer c4 = new Consumer(); + + Thread t1 = new Thread(c1); + Thread t2 = new Thread(c2); + Thread t3 = new Thread(c3); + Thread t4 = new Thread(c4); + + t1.start(); +// t2.start(); +// t3.start(); +// t4.start(); + + try + { + t1.join(); + t2.join(); + t3.join(); + t4.join(); + } + catch (InterruptedException e) + { + fail("Uanble to join to Consumer theads"); + } + + _logger.info("consumer 1 count is " + c1.getCount()); + _logger.info("consumer 2 count is " + c2.getCount()); + _logger.info("consumer 3 count is " + c3.getCount()); + _logger.info("consumer 4 count is " + c4.getCount()); + + Integer totalConsumed = c1.getCount() + c2.getCount() + c3.getCount() + c4.getCount(); + + // Check all messages were correctly delivered + int index = 0; + StringBuilder list = new StringBuilder(); + list.append("Failed to receive:"); + int failed = 0; + + for (long b : receieved) + { + if (b == 0 && index != 0) //delivery tag of zero shouldn't exist (and we don't have msg 0) + { + _logger.error("Index: " + index + " was not received."); + list.append(" "); + list.append(index); + list.append(":"); + list.append(b); + failed++; + } + index++; + } + assertEquals(list.toString() + "-" + numTestMessages + "-" + totalConsumed, 0, failed); + assertEquals("number of consumed messages does not match initial data", numTestMessages, totalConsumed); + passed=true; + } + + class Consumer implements Runnable + { + private Integer count = 0; + private Integer id; + + public Consumer() + { + id = consumerIds.addAndGet(1); + } + + public void run() + { + try + { + _logger.info("consumer-" + id + ": starting"); + QpidClientConnection conn = new QpidClientConnection(); + + conn.connect(); + + _logger.info("consumer-" + id + ": connected, consuming..."); + Message result; + do + { + result = conn.getNextMessage(queue, consumeTimeout); + if (result != null) + { + + long dt = ((AbstractJMSMessage) result).getDeliveryTag(); + + if (testReception) + { + int msgindex = result.getIntProperty("index"); + if (receieved[msgindex] != 0) + { + _logger.error("Received Message(" + msgindex + ":" + ((AbstractJMSMessage) result).getDeliveryTag() + + ") more than once."); + } + + if (_logger.isInfoEnabled()) + { + _logger.info("Received Message(" + System.identityHashCode(msgindex) + ") " + + "DT:" + dt + + "IN:" + msgindex); + } + + if (dt == 0) + { + _logger.error("DT is zero for msg:" + msgindex); + } + + receieved[msgindex] = dt; + } + + + count++; + if (count % 100 == 0) + { + _logger.info("consumer-" + id + ": got " + result + ", new count is " + count); + } + } + } + while (result != null); + + _logger.info("consumer-" + id + ": complete"); + conn.disconnect(); + + } + catch (Exception e) + { + e.printStackTrace(); + } + } + + public Integer getCount() + { + return count; + } + + public Integer getId() + { + return id; + } + } + + + public class QpidClientConnection implements ExceptionListener + { + private boolean transacted = true; + private int ackMode = Session.CLIENT_ACKNOWLEDGE; + private Connection connection; + + private String virtualHost; + private String brokerlist; + private int prefetch; + protected Session session; + protected boolean connected; + + public QpidClientConnection() + { + super(); + setVirtualHost("/test"); + setBrokerList(BROKER); + setPrefetch(5000); + } + + + public void connect() throws JMSException + { + if (!connected) + { + /* + * amqp://[user:pass@][clientid]/virtualhost? + * brokerlist='[transport://]host[:port][?option='value'[&option='value']];' + * [&failover='method[?option='value'[&option='value']]'] + * [&option='value']" + */ + String brokerUrl = "amqp://guest:guest@" + virtualHost + "?brokerlist='" + brokerlist + "'"; + try + { + AMQConnectionFactory factory = new AMQConnectionFactory(new AMQConnectionURL(brokerUrl)); + _logger.info("connecting to Qpid :" + brokerUrl); + connection = factory.createConnection(); + + // register exception listener + connection.setExceptionListener(this); + + session = ((AMQConnection) connection).createSession(transacted, ackMode, prefetch); + + + _logger.info("starting connection"); + connection.start(); + + connected = true; + } + catch (URLSyntaxException e) + { + throw new JMSException("URL syntax error in [" + brokerUrl + "]: " + e.getMessage()); + } + } + } + + public void disconnect() throws JMSException + { + if (connected) + { + session.commit(); + session.close(); + connection.close(); + connected = false; + _logger.info("disconnected"); + } + } + + public void disconnectWithoutCommit() throws JMSException + { + if (connected) + { + session.close(); + connection.close(); + connected = false; + _logger.info("disconnected without commit"); + } + } + + public String getBrokerList() + { + return brokerlist; + } + + public void setBrokerList(String brokerlist) + { + this.brokerlist = brokerlist; + } + + public String getVirtualHost() + { + return virtualHost; + } + + public void setVirtualHost(String virtualHost) + { + this.virtualHost = virtualHost; + } + + public void setPrefetch(int prefetch) + { + this.prefetch = prefetch; + } + + + /** override as necessary */ + public void onException(JMSException exception) + { + _logger.info("ExceptionListener event: error " + exception.getErrorCode() + ", message: " + exception.getMessage()); + } + + public boolean isConnected() + { + return connected; + } + + public Session getSession() + { + return session; + } + + /** + * Put a String as a text messages, repeat n times. A null payload will result in a null message. + * + * @param queueName The queue name to put to + * @param payload the content of the payload + * @param copies the number of messages to put + * + * @throws javax.jms.JMSException any exception that occurs + */ + public void put(String queueName, String payload, int copies) throws JMSException + { + if (!connected) + { + connect(); + } + + _logger.info("putting to queue " + queueName); + Queue queue = session.createQueue(queueName); + + final MessageProducer sender = session.createProducer(queue); + + for (int i = 0; i < copies; i++) + { + Message m = session.createTextMessage(payload + i); + m.setIntProperty("index", i + 1); + sender.send(m); + } + + session.commit(); + sender.close(); + _logger.info("put " + copies + " copies"); + } + + /** + * GET the top message on a queue. Consumes the message. Accepts timeout value. + * + * @param queueName The quename to get from + * @param readTimeout The timeout to use + * + * @return the content of the text message if any + * + * @throws javax.jms.JMSException any exception that occured + */ + public Message getNextMessage(String queueName, long readTimeout) throws JMSException + { + if (!connected) + { + connect(); + } + + Queue queue = session.createQueue(queueName); + + final MessageConsumer consumer = session.createConsumer(queue); + + Message message = consumer.receive(readTimeout); + session.commit(); + consumer.close(); + + Message result; + + // all messages we consume should be TextMessages + if (message instanceof TextMessage) + { + result = ((TextMessage) message); + } + else if (null == message) + { + result = null; + } + else + { + _logger.info("warning: received non-text message"); + result = message; + } + + return result; + } + + /** + * GET the top message on a queue. Consumes the message. + * + * @param queueName The Queuename to get from + * + * @return The string content of the text message, if any received + * + * @throws javax.jms.JMSException any exception that occurs + */ + public Message getNextMessage(String queueName) throws JMSException + { + return getNextMessage(queueName, 0); + } + + /** + * Completely clears a queue. For readTimeout behaviour see Javadocs for javax.jms.MessageConsumer. + * + * @param queueName The Queue name to consume from + * @param readTimeout The timeout for each consume + * + * @throws javax.jms.JMSException Any exception that occurs during the consume + * @throws InterruptedException If the consume thread was interrupted during a consume. + */ + public void consume(String queueName, int readTimeout) throws JMSException, InterruptedException + { + if (!connected) + { + connect(); + } + + _logger.info("consuming queue " + queueName); + Queue queue = session.createQueue(queueName); + + final MessageConsumer consumer = session.createConsumer(queue); + int messagesReceived = 0; + + _logger.info("consuming..."); + while ((consumer.receive(readTimeout)) != null) + { + messagesReceived++; + } + + session.commit(); + consumer.close(); + _logger.info("consumed: " + messagesReceived); + } + } + + + public void testRequeue() throws JMSException, AMQException, URLSyntaxException + { + String virtualHost = "/test"; + String brokerlist = "vm://:1"; + String brokerUrl = "amqp://guest:guest@" + virtualHost + "?brokerlist='" + brokerlist + "'"; + + Connection conn = new AMQConnection(brokerUrl); + Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue q = session.createQueue(queue); + + _logger.info("Create Consumer"); + MessageConsumer consumer = session.createConsumer(q); + + try + { + Thread.sleep(2000); + } + catch (InterruptedException e) + { + // + } + + _logger.info("Receiving msg"); + Message msg = consumer.receive(); + + assertNotNull("Message should not be null", msg); + + _logger.info("Close Consumer"); + consumer.close(); + + _logger.info("Close Connection"); + conn.close(); + } + +}
\ No newline at end of file diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/message/StreamMessageTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/message/StreamMessageTest.java index 07ef5f04d4..fb5ea58174 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/message/StreamMessageTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/message/StreamMessageTest.java @@ -80,7 +80,8 @@ public class StreamMessageTest extends TestCase //force synch to ensure the consumer has resulted in a bound queue - ((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.HEADERS_EXCHANGE_NAME, ExchangeDefaults.HEADERS_EXCHANGE_CLASS); + //((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.HEADERS_EXCHANGE_NAME, ExchangeDefaults.HEADERS_EXCHANGE_CLASS); + // This is the default now Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "test"); diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java index 0d75a6b968..2abc139ced 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java @@ -43,7 +43,8 @@ import javax.jms.TextMessage; public class CommitRollbackTest extends TestCase { protected AMQConnection conn; - protected final String queue = "direct://amq.direct//Qpid.Client.Transacted.CommitRollback.queue"; + protected String queue = "direct://amq.direct//Qpid.Client.Transacted.CommitRollback.queue"; + protected static int testMethod = 0; protected String payload = "xyzzy"; private Session _session; private MessageProducer _publisher; @@ -57,6 +58,11 @@ public class CommitRollbackTest extends TestCase { super.setUp(); TransportConnection.createVMBroker(1); + + testMethod++; + queue += testMethod; + + newConnection(); } @@ -84,7 +90,11 @@ public class CommitRollbackTest extends TestCase TransportConnection.killVMBroker(1); } - /** PUT a text message, disconnect before commit, confirm it is gone. */ + /** + * PUT a text message, disconnect before commit, confirm it is gone. + * + * @throws Exception On error + */ public void testPutThenDisconnect() throws Exception { assertTrue("session is not transacted", _session.getTransacted()); @@ -109,7 +119,11 @@ public class CommitRollbackTest extends TestCase assertNull("test message was put and disconnected before commit, but is still present", result); } - /** PUT a text message, disconnect before commit, confirm it is gone. */ + /** + * PUT a text message, disconnect before commit, confirm it is gone. + * + * @throws Exception On error + */ public void testPutThenCloseDisconnect() throws Exception { assertTrue("session is not transacted", _session.getTransacted()); @@ -140,6 +154,8 @@ public class CommitRollbackTest extends TestCase /** * PUT a text message, rollback, confirm message is gone. The consumer is on the same connection but different * session as producer + * + * @throws Exception On error */ public void testPutThenRollback() throws Exception { @@ -160,7 +176,11 @@ public class CommitRollbackTest extends TestCase assertNull("test message was put and rolled back, but is still present", result); } - /** GET a text message, disconnect before commit, confirm it is still there. The consumer is on a new connection */ + /** + * GET a text message, disconnect before commit, confirm it is still there. The consumer is on a new connection + * + * @throws Exception On error + */ public void testGetThenDisconnect() throws Exception { assertTrue("session is not transacted", _session.getTransacted()); @@ -194,6 +214,8 @@ public class CommitRollbackTest extends TestCase /** * GET a text message, close consumer, disconnect before commit, confirm it is still there. The consumer is on the * same connection but different session as producer + * + * @throws Exception On error */ public void testGetThenCloseDisconnect() throws Exception { @@ -230,6 +252,8 @@ public class CommitRollbackTest extends TestCase /** * GET a text message, rollback, confirm it is still there. The consumer is on the same connection but differnt * session to the producer + * + * @throws Exception On error */ public void testGetThenRollback() throws Exception { @@ -266,6 +290,8 @@ public class CommitRollbackTest extends TestCase /** * GET a text message, close message producer, rollback, confirm it is still there. The consumer is on the same * connection but different session as producer + * + * @throws Exception On error */ public void testGetThenCloseRollback() throws Exception { @@ -304,7 +330,11 @@ public class CommitRollbackTest extends TestCase } - /** Test that rolling back a session purges the dispatcher queue, and the messages arrive in the correct order */ + /** + * Test that rolling back a session purges the dispatcher queue, and the messages arrive in the correct order + * + * @throws Exception On error + */ public void testSend2ThenRollback() throws Exception { assertTrue("session is not transacted", _session.getTransacted()); @@ -339,37 +369,41 @@ public class CommitRollbackTest extends TestCase public void testSend2ThenCloseAfter1andTryAgain() throws Exception { -// assertTrue("session is not transacted", _session.getTransacted()); -// assertTrue("session is not transacted", _pubSession.getTransacted()); -// -// _logger.info("sending two test messages"); -// _publisher.send(_pubSession.createTextMessage("1")); -// _publisher.send(_pubSession.createTextMessage("2")); -// _pubSession.commit(); -// -// _logger.info("getting test message"); -// assertEquals("1", ((TextMessage) _consumer.receive(1000)).getText()); -// -// _consumer.close(); -// -// _consumer = _session.createConsumer(_jmsQueue); -// -// _logger.info("receiving result"); -// Message result = _consumer.receive(1000); -// _logger.error("1:" + result); -//// assertNotNull("test message was consumed and rolled back, but is gone", result); -//// assertEquals("1" , ((TextMessage) result).getText()); -//// assertTrue("Messasge is not marked as redelivered" + result, result.getJMSRedelivered()); -// -// result = _consumer.receive(1000); -// _logger.error("2" + result); -//// assertNotNull("test message was consumed and rolled back, but is gone", result); -//// assertEquals("2", ((TextMessage) result).getText()); -//// assertTrue("Messasge is marked as redelivered" + result, !result.getJMSRedelivered()); -// -// result = _consumer.receive(1000); -// _logger.error("3" + result); -// assertNull("test message should be null:" + result, result); + assertTrue("session is not transacted", _session.getTransacted()); + assertTrue("session is not transacted", _pubSession.getTransacted()); + + _logger.info("sending two test messages"); + _publisher.send(_pubSession.createTextMessage("1")); + _publisher.send(_pubSession.createTextMessage("2")); + _pubSession.commit(); + + _logger.info("getting test message"); + Message result = _consumer.receive(1000); + + assertNotNull("Message received should not be null", result); + assertEquals("1", ((TextMessage) result).getText()); + assertTrue("Messasge is marked as redelivered" + result, !result.getJMSRedelivered()); + + + _logger.info("Closing Consumer"); + _consumer.close(); + + _logger.info("Creating New consumer"); + _consumer = _session.createConsumer(_jmsQueue); + + _logger.info("receiving result"); + result = _consumer.receive(1000); + assertNotNull("test message was consumed and rolled back, but is gone", result); + assertEquals("1", ((TextMessage) result).getText()); + assertTrue("Messasge is not marked as redelivered" + result, result.getJMSRedelivered()); + + result = _consumer.receive(1000); + assertNotNull("test message was consumed and rolled back, but is gone", result); + assertEquals("2", ((TextMessage) result).getText()); + assertTrue("Messasge is not marked as redelivered" + result, result.getJMSRedelivered()); + + result = _consumer.receive(1000); + assertNull("test message should be null:" + result, result); } } diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java index 94cbb426e5..d994d4c141 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java @@ -62,69 +62,125 @@ public class TransactedTest extends TestCase { super.setUp(); TransportConnection.createVMBroker(1); + _logger.info("Create Connection"); con = new AMQConnection("vm://:1", "guest", "guest", "TransactedTest", "test"); + + _logger.info("Create Session"); session = con.createSession(true, Session.SESSION_TRANSACTED); + _logger.info("Create Q1"); queue1 = new AMQQueue(session.getDefaultQueueExchangeName(), new AMQShortString("Q1"), new AMQShortString("Q1"), false, true); + _logger.info("Create Q2"); queue2 = new AMQQueue(session.getDefaultQueueExchangeName(), new AMQShortString("Q2"), false); - + _logger.info("Create Consumer of Q1"); consumer1 = session.createConsumer(queue1); - //Dummy just to create the queue. + //Dummy just to create the queue. + _logger.info("Create Consumer of Q2"); MessageConsumer consumer2 = session.createConsumer(queue2); + _logger.info("Close Consumer of Q2"); consumer2.close(); + + _logger.info("Create producer to Q2"); producer2 = session.createProducer(queue2); + + _logger.info("Start Connection"); con.start(); + _logger.info("Create prep connection"); prepCon = new AMQConnection("vm://:1", "guest", "guest", "PrepConnection", "test"); + + _logger.info("Create prep session"); prepSession = prepCon.createSession(false, AMQSession.NO_ACKNOWLEDGE); + + _logger.info("Create prep producer to Q1"); prepProducer1 = prepSession.createProducer(queue1); + + _logger.info("Create prep connection start"); prepCon.start(); - //add some messages - prepProducer1.send(prepSession.createTextMessage("A")); - prepProducer1.send(prepSession.createTextMessage("B")); - prepProducer1.send(prepSession.createTextMessage("C")); + _logger.info("Create test connection"); testCon = new AMQConnection("vm://:1", "guest", "guest", "TestConnection", "test"); + _logger.info("Create test session"); testSession = testCon.createSession(false, AMQSession.NO_ACKNOWLEDGE); + _logger.info("Create test consumer of q2"); testConsumer2 = testSession.createConsumer(queue2); - } protected void tearDown() throws Exception { + _logger.info("Close connection"); con.close(); + _logger.info("Close test connection"); testCon.close(); + _logger.info("Close prep connection"); prepCon.close(); + _logger.info("Kill broker"); TransportConnection.killAllVMBrokers(); super.tearDown(); } public void testCommit() throws Exception { + //add some messages + _logger.info("Send prep A"); + prepProducer1.send(prepSession.createTextMessage("A")); + _logger.info("Send prep B"); + prepProducer1.send(prepSession.createTextMessage("B")); + _logger.info("Send prep C"); + prepProducer1.send(prepSession.createTextMessage("C")); + //send and receive some messages + _logger.info("Send X to Q2"); producer2.send(session.createTextMessage("X")); + _logger.info("Send Y to Q2"); producer2.send(session.createTextMessage("Y")); + _logger.info("Send Z to Q2"); producer2.send(session.createTextMessage("Z")); + + + _logger.info("Read A from Q1"); expect("A", consumer1.receive(1000)); + _logger.info("Read B from Q1"); expect("B", consumer1.receive(1000)); + _logger.info("Read C from Q1"); expect("C", consumer1.receive(1000)); //commit + _logger.info("session commit"); session.commit(); + _logger.info("Start test Connection"); testCon.start(); + //ensure sent messages can be received and received messages are gone + _logger.info("Read X from Q2"); expect("X", testConsumer2.receive(1000)); + _logger.info("Read Y from Q2"); expect("Y", testConsumer2.receive(1000)); + _logger.info("Read Z from Q2"); expect("Z", testConsumer2.receive(1000)); + _logger.info("create test session on Q1"); testConsumer1 = testSession.createConsumer(queue1); + _logger.info("Read null from Q1"); assertTrue(null == testConsumer1.receive(1000)); + _logger.info("Read null from Q2"); assertTrue(null == testConsumer2.receive(1000)); } public void testRollback() throws Exception { + //add some messages + _logger.info("Send prep A"); + prepProducer1.send(prepSession.createTextMessage("A")); + _logger.info("Send prep B"); + prepProducer1.send(prepSession.createTextMessage("B")); + _logger.info("Send prep C"); + prepProducer1.send(prepSession.createTextMessage("C")); + + //Quick sleep to ensure all three get pre-fetched + Thread.sleep(500); + _logger.info("Sending X Y Z"); producer2.send(session.createTextMessage("X")); producer2.send(session.createTextMessage("Y")); @@ -140,9 +196,9 @@ public class TransactedTest extends TestCase _logger.info("Receiving A B C"); //ensure sent messages are not visible and received messages are requeued - expect("A", consumer1.receive(1000)); - expect("B", consumer1.receive(1000)); - expect("C", consumer1.receive(1000)); + expect("A", consumer1.receive(1000), true); + expect("B", consumer1.receive(1000), true); + expect("C", consumer1.receive(1000), true); _logger.info("Starting new connection"); testCon.start(); @@ -152,20 +208,22 @@ public class TransactedTest extends TestCase assertTrue(null == testConsumer2.receive(1000)); session.commit(); + + _logger.info("Testing we have no messages left after commit"); + assertTrue(null == testConsumer1.receive(1000)); + assertTrue(null == testConsumer2.receive(1000)); } public void testResendsMsgsAfterSessionClose() throws Exception { AMQConnection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "test"); - Session consumerSession = con.createSession(true, Session.CLIENT_ACKNOWLEDGE); + Session consumerSession = con.createSession(true, Session.SESSION_TRANSACTED); AMQQueue queue3 = new AMQQueue(consumerSession.getDefaultQueueExchangeName(), new AMQShortString("Q3"), false); MessageConsumer consumer = consumerSession.createConsumer(queue3); - //force synch to ensure the consumer has resulted in a bound queue - ((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.DIRECT_EXCHANGE_NAME, ExchangeDefaults.DIRECT_EXCHANGE_CLASS); AMQConnection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "test"); - Session producerSession = con2.createSession(true, Session.CLIENT_ACKNOWLEDGE); + Session producerSession = con2.createSession(true, Session.SESSION_TRANSACTED); MessageProducer producer = producerSession.createProducer(queue3); _logger.info("Sending four messages"); @@ -176,65 +234,77 @@ public class TransactedTest extends TestCase producerSession.commit(); - _logger.info("Starting connection"); con.start(); TextMessage tm = (TextMessage) consumer.receive(); + assertNotNull(tm); + assertEquals("msg1", tm.getText()); - tm.acknowledge(); consumerSession.commit(); - _logger.info("Received and acknowledged first message"); + _logger.info("Received and committed first message"); tm = (TextMessage) consumer.receive(1000); assertNotNull(tm); + assertEquals("msg2", tm.getText()); + tm = (TextMessage) consumer.receive(1000); assertNotNull(tm); + assertEquals("msg3", tm.getText()); + tm = (TextMessage) consumer.receive(1000); assertNotNull(tm); + assertEquals("msg4", tm.getText()); + _logger.info("Received all four messages. Closing connection with three outstanding messages"); consumerSession.close(); - consumerSession = con.createSession(true, Session.CLIENT_ACKNOWLEDGE); + consumerSession = con.createSession(true, Session.SESSION_TRANSACTED); consumer = consumerSession.createConsumer(queue3); // no ack for last three messages so when I call recover I expect to get three messages back - tm = (TextMessage) consumer.receive(3000); assertNotNull(tm); assertEquals("msg2", tm.getText()); + assertTrue("Message is not redelivered", tm.getJMSRedelivered()); tm = (TextMessage) consumer.receive(3000); assertNotNull(tm); assertEquals("msg3", tm.getText()); + assertTrue("Message is not redelivered", tm.getJMSRedelivered()); tm = (TextMessage) consumer.receive(3000); assertNotNull(tm); assertEquals("msg4", tm.getText()); + assertTrue("Message is not redelivered", tm.getJMSRedelivered()); + + _logger.info("Received redelivery of three messages. Committing"); - _logger.info("Received redelivery of three messages. Acknowledging last message"); - tm.acknowledge(); consumerSession.commit(); - _logger.info("Calling acknowledge with no outstanding messages"); - // all acked so no messages to be delivered + _logger.info("Called commit"); - tm = (TextMessage) consumer.receiveNoWait(); + tm = (TextMessage) consumer.receive(1000); assertNull(tm); + _logger.info("No messages redelivered as is expected"); con.close(); con2.close(); - } - private void expect(String text, Message msg) throws JMSException { + expect(text, msg, false); + } + + private void expect(String text, Message msg, boolean requeued) throws JMSException + { assertNotNull("Message should not be null", msg); assertTrue("Message should be a text message", msg instanceof TextMessage); assertEquals("Message content does not match expected", text, ((TextMessage) msg).getText()); + assertEquals("Message should " + (requeued ? "" : "not") + " be requeued", requeued, msg.getJMSRedelivered()); } public static junit.framework.Test suite() diff --git a/java/cluster/src/test/java/org/apache/qpid/server/cluster/SimpleClusterTest.java b/java/cluster/src/test/java/org/apache/qpid/server/cluster/SimpleClusterTest.java index 70209cd2a3..86cde3cee7 100644 --- a/java/cluster/src/test/java/org/apache/qpid/server/cluster/SimpleClusterTest.java +++ b/java/cluster/src/test/java/org/apache/qpid/server/cluster/SimpleClusterTest.java @@ -37,7 +37,7 @@ public class SimpleClusterTest extends TestCase AMQConnection con = new AMQConnection("localhost:9000", "guest", "guest", "test", "/test"); AMQSession session = (AMQSession) con.createSession(false, AMQSession.NO_ACKNOWLEDGE); System.out.println("Session created"); - session.declareExchange(new AMQShortString("my_exchange"), new AMQShortString("direct")); + session.declareExchange(new AMQShortString("my_exchange"), new AMQShortString("direct"), true); System.out.println("Exchange declared"); con.close(); System.out.println("Connection closed"); diff --git a/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java b/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java index ed244396bf..379f7feb4f 100644 --- a/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java +++ b/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java @@ -100,7 +100,7 @@ public final class AMQConstant public static final AMQConstant RESOURCE_ERROR = new AMQConstant(506, "resource error", true); - public static final AMQConstant NOT_ALLOWED = new AMQConstant(507, "not allowed", true); + public static final AMQConstant NOT_ALLOWED = new AMQConstant(530, "not allowed", true); public static final AMQConstant NOT_IMPLEMENTED = new AMQConstant(540, "not implemented", true); diff --git a/java/common/src/main/java/org/apache/qpid/util/ConcurrentLinkedMessageQueueAtomicSize.java b/java/common/src/main/java/org/apache/qpid/util/ConcurrentLinkedMessageQueueAtomicSize.java index cdf686b4cb..883d5018cd 100644 --- a/java/common/src/main/java/org/apache/qpid/util/ConcurrentLinkedMessageQueueAtomicSize.java +++ b/java/common/src/main/java/org/apache/qpid/util/ConcurrentLinkedMessageQueueAtomicSize.java @@ -41,6 +41,11 @@ public class ConcurrentLinkedMessageQueueAtomicSize<E> extends ConcurrentLinkedQ return super.size() + _messageHeadSize.get(); } + public int headSize() + { + return _messageHeadSize.get(); + } + @Override public E poll() { @@ -50,10 +55,14 @@ public class ConcurrentLinkedMessageQueueAtomicSize<E> extends ConcurrentLinkedQ } else { - _logger.debug("Providing item from message head"); - E e = _messageHead.poll(); + if (_logger.isDebugEnabled()) + { + _logger.debug("Providing item(" + e + ")from message head"); + } + + if (e != null) { _messageHeadSize.decrementAndGet(); @@ -159,8 +168,12 @@ public class ConcurrentLinkedMessageQueueAtomicSize<E> extends ConcurrentLinkedQ } else { - _logger.debug("Providing item from message head"); - return _messageHead.peek(); + E o = _messageHead.peek(); + if (_logger.isDebugEnabled()) + { + _logger.debug("Peeking item (" + o + ") from message head"); + } + return o; } } @@ -186,7 +199,10 @@ public class ConcurrentLinkedMessageQueueAtomicSize<E> extends ConcurrentLinkedQ public boolean pushHead(E o) { - _logger.debug("Adding item to head of queue"); + if (_logger.isDebugEnabled()) + { + _logger.debug("Adding item(" + o + ") to head of queue"); + } if (_messageHead.offer(o)) { _messageHeadSize.incrementAndGet(); diff --git a/java/systests/src/main/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java b/java/systests/src/main/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java index c8271f1549..87491ed3d3 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java +++ b/java/systests/src/main/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java @@ -67,7 +67,8 @@ public class ReturnUnroutableMandatoryMessageTest extends TestCase implements Ex MessageConsumer consumer = consumerSession.createConsumer(queue, AMQSession.DEFAULT_PREFETCH_LOW_MARK, AMQSession.DEFAULT_PREFETCH_HIGH_MARK, false, false, (String) null, ft);
//force synch to ensure the consumer has resulted in a bound queue
- ((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.HEADERS_EXCHANGE_NAME, ExchangeDefaults.HEADERS_EXCHANGE_CLASS);
+ //((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.HEADERS_EXCHANGE_NAME, ExchangeDefaults.HEADERS_EXCHANGE_CLASS);
+ // This is the default now
Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "/test");
|
