diff options
| author | Martin Ritchie <ritchiem@apache.org> | 2007-03-06 14:12:47 +0000 |
|---|---|---|
| committer | Martin Ritchie <ritchiem@apache.org> | 2007-03-06 14:12:47 +0000 |
| commit | f39d26362950a622eabbccbcc1616621d5015db1 (patch) | |
| tree | 27a8e73ee10cb483dd64fe03928275a576f7587b /java | |
| parent | 92910e32ef098b6f60b25781456157c2bcd1fe81 (diff) | |
| download | qpid-python-f39d26362950a622eabbccbcc1616621d5015db1.tar.gz | |
QPID-403 QPID-346 QPID-355 QPID-386 QPID-389 Updates to fix Transactional Rollback.
QPID-346 Message loss after rollback\recover
QPID-355 Closing a consumer does not ensure messages delivery will stop for that subscription
QPID-386 Updated Transactional Tests to cover underlying AMQP/Qpid state.
QPID-389 Prefetched message are not correctly returned to the queue
QPID-403 Implement Basic.Reject
Broker
UnacknowledgedMessage - Added toString for debug
UnacknowledgedMessageMapImpl - Removed resendMessages method as all sending should go via DeliveryManager and Subscription.
AMQChannel - Updated resend and requeue methods so they do not directly write messages to a subscriber. This was violating the suspension state.
- Used a local non-transactional context to requeue messages as the internal requeuing of messages on the broker should not be part of any client transaction.
- Maked messages as resent.
- Removed warnings from IDE about missing JavaDoc text etc.
BasicAckMethodHandler - Added debugging
BasicRecoverMethodHandler - Removed session from the resend call.
BasicRejectMethodHandler - Initial implementation. Hooks left for possible 'resend' bit.
ChannelCloseHandler - Fixed bug where channel wasn't marked as fully closed on reception of a close from the client.
TxRollbackHandler - Removed session from resend call.
AMQMinaProtocolSession - Fixed bug where channel was marked as awaiting closure before it had actually been closed. This causes problems as the close looks up the channel by id again which will return null after it has been marked as awaiting closure.
AMQMessage - Initial implementation of Rejection. Currently inactive in hasInterest() as we are miss-using reject to requeue prefetched messages from the client.
AMQQueue - Removed debug method as it made reading the log very difficult as all the logs had the same line number
ConcurrentSelectorDeliveryManager - Fixed clearAllMessages() as it didn't actually remove the messages.
- Fixed bad logic in getNextMessage when using null subscriber. (as done by clearAllMessages)
- Added more logging messages. Made more frequent logging a trace value.
- Added debugIdentity() method to reduce over head in calculating standard log prefix.
- Allowed messages to be added to the front of the queue.
- Added currentStatus() to an overview of the queue's current state.
SubscriptionImpl - Updated to handle closure correctly (QPID-355)
-Updated the deliver method so it doesn't use a try->finally to do msg.setDeliveredToConsumer() as this would be done even in the event of an error.
- Created an additional logger to log suspension calls rather than through the SI logger which logs a lot of detail.
Client
pom.xml - Excluded older version of log4j that commons-collections exposes.
AMQSession - Added ability for dispatcher to start in stopped state.
- Added dispatcher logger
- Added checks around logging
- Added message rejection if the dispatcher receives a message that it doesn't have a consumer for.
- Updated message rejection to allow the dispatcher to perform the rejection if running this ensures that all queued messages are processed correctly and rejection occurs in order.
- rollback() before calling rollback all pending queued messages must be rejected as rollback will clear unacked map which the rejects caused by rollback() will need.
- fixed closedProducersAndConsumers so that it will rethrow any JMS Exception
- recover() as for rollback() the rejects need to be done before the Recover Call to the broker.
- Allowed delclareExchange to be done synchronously programatically
- Updated confirmConsumerCancelled to use the dispatcher to perform the clean up. This required the creation of the dispatcher in stopped mode so that it does not start and message attempted to be delivered while the subscriber is being cancelled.
BasicMessageConsumer - Updated close not to perform the deregistration. This is done in via BasicCancelOkMethodHandler
- Added guards on logging
- Record all messages that have been received so they can be rejected if rollback occurs. so had to change impl of acknowledgeLastDelivered.
- Updated Rollback to initially reject all received messages that are still unAcked.
- Added a recursive call should the queue not be empty at the end of the rollback.. with a warning.
BasicCancelOkMethodHandler - White space changes to meet style guide. Added guard on logging.
UnprocessedMessage - White space changes to meet style guide.
StateWaiter - Added comment about timeout bug.
FlowControllingBlockingQueue - Tidied imports
RecoverTest - Updated as declareExchange is now Synchronous
ChannelCloseTest - added guard on logging
MessageRequeueTest - Added to better test underlying AMQP/Qpid state QPID-386
StreamMessageTest - Updated as declareExchange is now Synchronous
CommitRollbackTest - added Additional test case to ensure prefetch queue is correctly purged.
TransactedTest - Added logging and additional tests.
Cluster
SimpleClusterTest - updated in line with AMQSession.delcareExchange changes
Common
AMQConstant - Fixed error code 'not allowed' should be 530 not 507.
ConcurrentLinkedMessageQueueAtomicSize - Updated to beable to get the size of messages on the 'head' queue along with additional debug
Systests
ReturnUnroutableMandatoryMessageTest - Updated as declareExchange is now Synchronous
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@515127 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
30 files changed, 1599 insertions, 454 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index be2cee79ee..5dd6619cff 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -99,7 +99,7 @@ public class AMQChannel private final MessageRouter _exchanges; - private TransactionalContext _txnContext; + private TransactionalContext _txnContext, _nonTransactedContext; /** * A context used by the message store enabling it to track context for a given channel even across thread @@ -113,9 +113,9 @@ public class AMQChannel private Set<Long> _browsedAcks = new HashSet<Long>(); + //Why do we need this reference ? - ritchiem private final AMQProtocolSession _session; - public AMQChannel(AMQProtocolSession session, int channelId, MessageStore messageStore, MessageRouter exchanges) throws AMQException { @@ -210,9 +210,9 @@ public class AMQChannel } else { - if (_log.isDebugEnabled()) + if (_log.isTraceEnabled()) { - _log.debug("Content header received on channel " + _channelId); + _log.trace(debugIdentity() + "Content header received on channel " + _channelId); } _currentMessage.setContentHeaderBody(contentHeaderBody); routeCurrentMessage(); @@ -234,9 +234,9 @@ public class AMQChannel throw new AMQException("Received content body without previously receiving a JmsPublishBody"); } - if (_log.isDebugEnabled()) + if (_log.isTraceEnabled()) { - _log.debug("Content body received on channel " + _channelId); + _log.trace(debugIdentity() + "Content body received on channel " + _channelId); } try { @@ -289,8 +289,10 @@ public class AMQChannel * @param tag the tag chosen by the client (if null, server will generate one) * @param queue the queue to subscribe to * @param session the protocol session of the subscriber - * @param noLocal - * @param exclusive + * @param noLocal Flag stopping own messages being receivied. + * @param exclusive Flag requesting exclusive access to the queue + * @param acks Are acks enabled for this subscriber + * @param filters Filters to apply to this subscriber * * @return the consumer tag. This is returned to the subscriber and used in subsequent unsubscribe requests * @@ -327,6 +329,8 @@ public class AMQChannel /** * Called from the protocol session to close this channel and clean up. T * + * @param session The session to close + * * @throws AMQException if there is an error during closure */ public void close(AMQProtocolSession session) throws AMQException @@ -352,10 +356,23 @@ public class AMQChannel * @param message the message that was delivered * @param deliveryTag the delivery tag used when delivering the message (see protocol spec for description of the * delivery tag) + * @param consumerTag The tag for the consumer that is to acknowledge this message. * @param queue the queue from which the message was delivered */ public void addUnacknowledgedMessage(AMQMessage message, long deliveryTag, AMQShortString consumerTag, AMQQueue queue) { + if (_log.isDebugEnabled()) + { + if (queue == null) + { + _log.debug("Adding unacked message with a null queue:" + message.debugIdentity()); + } + else + { + _log.debug(debugIdentity() + " Adding unacked message(" + deliveryTag + ") with a queue(" + queue + "):" + message.debugIdentity()); + } + } + synchronized (_unacknowledgedMessageMap.getLock()) { _unacknowledgedMessageMap.add(deliveryTag, new UnacknowledgedMessage(queue, message, consumerTag, deliveryTag)); @@ -363,8 +380,15 @@ public class AMQChannel } } + private final String id = "(" + System.identityHashCode(this) + ")"; + + public String debugIdentity() + { + return _channelId + id; + } + /** - * Called to attempt re-enqueue all outstanding unacknowledged messages on the channel. May result in delivery to + * Called to attempt re-delivery all outstanding unacknowledged messages on the channel. May result in delivery to * this same channel or to other subscribers. * * @throws org.apache.qpid.AMQException if the requeue fails @@ -374,11 +398,22 @@ public class AMQChannel // we must create a new map since all the messages will get a new delivery tag when they are redelivered Collection<UnacknowledgedMessage> messagesToBeDelivered = _unacknowledgedMessageMap.cancelAllMessages(); - TransactionalContext nontransacted = null; + // Deliver these messages out of the transaction as their delivery was never + // part of the transaction only the receive. + TransactionalContext deliveryContext; if (!(_txnContext instanceof NonTransactionalContext)) { - nontransacted = new NonTransactionalContext(_messageStore, _storeContext, this, - _returnMessages, _browsedAcks); + if (_nonTransactedContext == null) + { + _nonTransactedContext = new NonTransactionalContext(_messageStore, _storeContext, this, + _returnMessages, _browsedAcks); + } + + deliveryContext = _nonTransactedContext; + } + else + { + deliveryContext = _txnContext; } @@ -386,72 +421,130 @@ public class AMQChannel { if (unacked.queue != null) { - // Deliver these messages out of the transaction as their delivery was never - // part of the transaction only the receive. - if (!(_txnContext instanceof NonTransactionalContext)) - { - nontransacted.deliver(unacked.message, unacked.queue, false); - } - else - { - _txnContext.deliver(unacked.message, unacked.queue, false); - } + unacked.message.setRedelivered(true); + + // Deliver Message + deliveryContext.deliver(unacked.message, unacked.queue, false); + + // Should we allow access To the DM to directy deliver the message? + // As we don't need to check for Consumers or worry about incrementing the message count? +// unacked.queue.getDeliveryManager().deliver(_storeContext, unacked.queue.getName(), unacked.message, false); } } } + /** + * Requeue a single message + * + * @param deliveryTag The message to requeue + * + * @throws AMQException If something goes wrong. + */ public void requeue(long deliveryTag) throws AMQException { UnacknowledgedMessage unacked = _unacknowledgedMessageMap.remove(deliveryTag); if (unacked != null) { - TransactionalContext nontransacted = null; + + // Ensure message is released for redelivery + unacked.message.release(); + + // Mark message redelivered + unacked.message.setRedelivered(true); + + // Deliver these messages out of the transaction as their delivery was never + // part of the transaction only the receive. + TransactionalContext deliveryContext; if (!(_txnContext instanceof NonTransactionalContext)) { - nontransacted = new NonTransactionalContext(_messageStore, _storeContext, this, - _returnMessages, _browsedAcks); + if (_nonTransactedContext == null) + { + _nonTransactedContext = new NonTransactionalContext(_messageStore, _storeContext, this, + _returnMessages, _browsedAcks); + } + + deliveryContext = _nonTransactedContext; + } + else + { + deliveryContext = _txnContext; } - if (!(_txnContext instanceof NonTransactionalContext)) + + if (unacked.queue != null) { - nontransacted.deliver(unacked.message, unacked.queue, false); + //Redeliver the messages to the front of the queue + deliveryContext.deliver(unacked.message, unacked.queue, true); + + unacked.message.decrementReference(_storeContext); } else { - _txnContext.deliver(unacked.message, unacked.queue, false); + _log.warn(System.identityHashCode(this) + " Requested requeue of message(" + unacked.message.debugIdentity() + "):" + deliveryTag + + " but no queue defined and no DeadLetter queue so DROPPING message."); +// _log.error("Requested requeue of message:" + deliveryTag + +// " but no queue defined using DeadLetter queue:" + getDeadLetterQueue()); +// +// deliveryContext.deliver(unacked.message, getDeadLetterQueue(), false); +// +// unacked.message.decrementReference(_storeContext); } - unacked.message.decrementReference(_storeContext); } else { - _log.error("Requested requeue of message:" + deliveryTag + " but no such delivery tag exists"); + _log.warn("Requested requeue of message:" + deliveryTag + " but no such delivery tag exists." + _unacknowledgedMessageMap.size()); + + if (_log.isDebugEnabled()) + { + _unacknowledgedMessageMap.visit(new UnacknowledgedMessageMap.Visitor() + { + int count = 0; + + public boolean callback(UnacknowledgedMessage message) throws AMQException + { + _log.debug((count++) + ": (" + message.message.debugIdentity() + ")" + + "[" + message.deliveryTag + "]"); + return false; // Continue + } + + public void visitComplete() + { + + } + }); + } } } - /** Called to resend all outstanding unacknowledged messages to this same channel. - * @param session the session - * @param requeue if true then requeue, else resend - * @throws org.apache.qpid.AMQException */ - public void resend(final AMQProtocolSession session, final boolean requeue) throws AMQException + /** + * Called to resend all outstanding unacknowledged messages to this same channel. + * + * @param requeue Are the messages to be requeued or dropped. + * + * @throws AMQException When something goes wrong. + */ + public void resend(final boolean requeue) throws AMQException { final List<UnacknowledgedMessage> msgToRequeue = new LinkedList<UnacknowledgedMessage>(); final List<UnacknowledgedMessage> msgToResend = new LinkedList<UnacknowledgedMessage>(); if (_log.isInfoEnabled()) { - _log.info("unacked map contains " + _unacknowledgedMessageMap.size()); + _log.info("unacked map Size:" + _unacknowledgedMessageMap.size()); } + // Process the Unacked-Map. + // Marking messages who still have a consumer for to be resent + // and those that don't to be requeued. _unacknowledgedMessageMap.visit(new UnacknowledgedMessageMap.Visitor() { public boolean callback(UnacknowledgedMessage message) throws AMQException { - long deliveryTag = message.deliveryTag; AMQShortString consumerTag = message.consumerTag; AMQMessage msg = message.message; msg.setRedelivered(true); @@ -503,14 +596,21 @@ public class AMQChannel { if (!msgToResend.isEmpty()) { - _log.info("Preparing (" + msgToResend.size() + ") message to resend to."); + _log.info("Preparing (" + msgToResend.size() + ") message to resend."); + } + else + { + _log.info("No message to resend."); } } for (UnacknowledgedMessage message : msgToResend) { AMQMessage msg = message.message; - // Our Java Client will always suspend the channel when resending!! + // Our Java Client will always suspend the channel when resending! + // If the client has requested the messages be resent then it is + // their responsibility to ensure that thay are capable of receiving them + // i.e. The channel hasn't been server side suspended. // if (isSuspended()) // { // _log.info("Channel is suspended so requeuing"); @@ -518,50 +618,58 @@ public class AMQChannel // msgToRequeue.add(message); // } // else - { - //release to allow it to be delivered - msg.release(); +// { + //release to allow it to be delivered + msg.release(); - // Without any details from the client about what has been processed we have to mark - // all messages in the unacked map as redelivered. - msg.setRedelivered(true); + // Without any details from the client about what has been processed we have to mark + // all messages in the unacked map as redelivered. + msg.setRedelivered(true); - Subscription sub = msg.getDeliveredSubscription(); + Subscription sub = msg.getDeliveredSubscription(); - if (sub != null) + if (sub != null) + { + // Get the lock so we can tell if the sub scription has closed. + // will stop delivery to this subscription until the lock is released. + // note: this approach would allow the use of a single queue if the + // PreDeliveryQueue would allow head additions. + // In the Java Qpid client we are suspended whilst doing this so it is all rather Mute.. + // needs guidance from AMQP WG Model SIG + synchronized (sub.getSendLock()) { - synchronized (sub.getSendLock()) + if (sub.isClosed()) { - if (sub.isClosed()) + if (_log.isDebugEnabled()) { - _log.info("Subscription closed during resend so requeuing message"); - //move this message to requeue - msgToRequeue.add(message); + _log.debug("Subscription(" + System.identityHashCode(sub) + ") closed during resend so requeuing message"); } - else + //move this message to requeue + msgToRequeue.add(message); + } + else + { + if (_log.isDebugEnabled()) { - if (_log.isDebugEnabled()) - { - _log.debug("Requeuing (" + System.identityHashCode(msg) + ") for resend"); - } - // Will throw an exception if the sub is closed - sub.addToResendQueue(msg); - _unacknowledgedMessageMap.remove(message.deliveryTag); - // Don't decrement as we are bypassing the normal deliver which increments - // this is what there is a decrement on the Requeue as deliver will increment. - // msg.decrementReference(_storeContext); + _log.debug("Requeuing " + msg.debugIdentity() + " for resend via sub:" + System.identityHashCode(sub)); } + sub.addToResendQueue(msg); + _unacknowledgedMessageMap.remove(message.deliveryTag); + // Don't decrement as we are bypassing the normal deliver which increments + // this is why there is a decrement on the Requeue as deliver will increment. + // msg.decrementReference(_storeContext); } - } - else - { - _log.info("DeliveredSubscription not recorded so just requeueing to prevent loss"); - //move this message to requeue - msgToRequeue.add(message); - } + } // sync(sub.getSendLock) } - } + else + { + _log.info("DeliveredSubscription not recorded so just requeueing to prevent loss"); + //move this message to requeue + msgToRequeue.add(message); + } + } // for all messages +// } else !isSuspend if (_log.isInfoEnabled()) { @@ -571,26 +679,31 @@ public class AMQChannel } } - TransactionalContext nontransacted = null; + // Deliver these messages out of the transaction as their delivery was never + // part of the transaction only the receive. + TransactionalContext deliveryContext; if (!(_txnContext instanceof NonTransactionalContext)) { - nontransacted = new NonTransactionalContext(_messageStore, _storeContext, this, - _returnMessages, _browsedAcks); + if (_nonTransactedContext == null) + { + _nonTransactedContext = new NonTransactionalContext(_messageStore, _storeContext, this, + _returnMessages, _browsedAcks); + } + + deliveryContext = _nonTransactedContext; + } + else + { + deliveryContext = _txnContext; } // Process Messages to Requeue at the front of the queue for (UnacknowledgedMessage message : msgToRequeue) { - // Deliver these messages out of the transaction as their delivery was never - // part of the transaction only the receive. - if (!(_txnContext instanceof NonTransactionalContext)) - { - nontransacted.deliver(message.message, message.queue, true); - } - else - { - _txnContext.deliver(message.message, message.queue, true); - } + message.message.release(); + message.message.setRedelivered(true); + + deliveryContext.deliver(message.message, message.queue, true); _unacknowledgedMessageMap.remove(message.deliveryTag); message.message.decrementReference(_storeContext); diff --git a/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java b/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java index 3f2348b71b..940b5b2bf1 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java +++ b/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java @@ -42,6 +42,21 @@ public class UnacknowledgedMessage message.incrementReference(); } + public String toString() + { + StringBuilder sb = new StringBuilder(); + sb.append("Q:"); + sb.append(queue); + sb.append(" M:"); + sb.append(message); + sb.append(" CT:"); + sb.append(consumerTag); + sb.append(" DT:"); + sb.append(deliveryTag); + + return sb.toString(); + } + public void discard(StoreContext storeContext) throws AMQException { if (queue != null) diff --git a/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java b/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java index 99cc60011a..30bbdea2ef 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java +++ b/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java @@ -196,25 +196,7 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap } } } - - public void resendMessages(AMQProtocolSession protocolSession, int channelId) throws AMQException - { - synchronized (_lock) - { - for (Map.Entry<Long, UnacknowledgedMessage> entry : _map.entrySet()) - { - long deliveryTag = entry.getKey(); - AMQShortString consumerTag = entry.getValue().consumerTag; - AMQMessage msg = entry.getValue().message; - - if(consumerTag != null) - { - protocolSession.getProtocolOutputConverter().writeDeliver(msg, channelId, deliveryTag, consumerTag); - } - } - } - } - + public UnacknowledgedMessage get(long key) { synchronized (_lock) diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicAckMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicAckMethodHandler.java index f93b2b25e6..a6972475a6 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicAckMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicAckMethodHandler.java @@ -47,12 +47,13 @@ public class BasicAckMethodHandler implements StateAwareMethodListener<BasicAckB public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<BasicAckBody> evt) throws AMQException { AMQProtocolSession protocolSession = stateManager.getProtocolSession(); + BasicAckBody body = evt.getMethod(); if (_log.isDebugEnabled()) { - _log.debug("Ack received on channel " + evt.getChannelId()); + _log.debug("Ack(Tag:" + body.deliveryTag + ":Mult:" + body.multiple + ") received on channel " + evt.getChannelId()); } - BasicAckBody body = evt.getMethod(); + final AMQChannel channel = protocolSession.getChannel(evt.getChannelId()); if (channel == null) diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java index bc11e4652c..a436c35473 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java @@ -54,7 +54,7 @@ public class BasicRecoverMethodHandler implements StateAwareMethodListener<Basic throw body.getChannelNotFoundException(evt.getChannelId()); } - channel.resend(session, body.requeue); + channel.resend(body.requeue); // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java index ed13092ded..4e77a5e8b9 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java @@ -23,6 +23,7 @@ package org.apache.qpid.server.handler; import org.apache.qpid.AMQException; import org.apache.qpid.framing.BasicRejectBody; import org.apache.qpid.protocol.AMQMethodEvent; +import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.ack.UnacknowledgedMessage; import org.apache.qpid.server.protocol.AMQProtocolSession; @@ -49,20 +50,67 @@ public class BasicRejectMethodHandler implements StateAwareMethodListener<BasicR { AMQProtocolSession session = stateManager.getProtocolSession(); - _logger.info("FIXME: Rejecting:" + evt.getMethod().deliveryTag + ": Requeue:" + evt.getMethod().requeue); - int channelId = evt.getChannelId(); - UnacknowledgedMessage message = session.getChannel(channelId).getUnacknowledgedMessageMap().get(evt.getMethod().deliveryTag); - - _logger.info("Need to reject message:" + message); -// if (evt.getMethod().requeue) -// { -// session.getChannel(channelId).requeue(evt.getMethod().deliveryTag); -// } -// else -// { -// // session.getChannel(channelId).resend(message); -// } + if (_logger.isTraceEnabled()) + { + _logger.trace("Rejecting:" + evt.getMethod().deliveryTag + + ": Requeue:" + evt.getMethod().requeue + +// ": Resend:" + evt.getMethod().resend + + " on channel:" + channelId); + } + + AMQChannel channel = session.getChannel(channelId); + + if (channel == null) + { + throw evt.getMethod().getChannelNotFoundException(channelId); + } + + if (_logger.isTraceEnabled()) + { + _logger.trace("Rejecting:" + evt.getMethod().deliveryTag + + ": Requeue:" + evt.getMethod().requeue + +// ": Resend:" + evt.getMethod().resend + + " on channel:" + channel.debugIdentity()); + } + + long deliveryTag = evt.getMethod().deliveryTag; + + UnacknowledgedMessage message = channel.getUnacknowledgedMessageMap().get(deliveryTag); + + if (message == null) + { + _logger.warn("Dropping reject request as message is null for tag:" + deliveryTag); +// throw evt.getMethod().getChannelException(AMQConstant.NOT_FOUND, "Delivery Tag(" + deliveryTag + ")not known"); + } + else + { + + if (_logger.isTraceEnabled()) + { + _logger.trace("Rejecting: DT:" + deliveryTag + "-" + message.message.debugIdentity() + + ": Requeue:" + evt.getMethod().requeue + +// ": Resend:" + evt.getMethod().resend + + " on channel:" + channel.debugIdentity()); + } + + // If we haven't requested message to be resent to this consumer then reject it from ever getting it. +// if (!evt.getMethod().resend) + { + message.message.reject(message.message.getDeliveredSubscription()); + } + + if (evt.getMethod().requeue) + { + channel.requeue(deliveryTag); + } + else + { + _logger.warn("Dropping message as requeue not required and there is no dead letter queue"); +// message.queue = channel.getDefaultDeadLetterQueue(); +// channel.requeue(deliveryTag); + } + } } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java index 9a8fce7129..777784ca30 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java @@ -63,6 +63,8 @@ public class ChannelCloseHandler implements StateAwareMethodListener<ChannelClos } session.closeChannel(channelId); + // Client requested closure so we don't wait for ok we send it + stateManager.getProtocolSession().closeChannelOk(channelId); // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java index a10f44f906..f747f7a840 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java @@ -63,7 +63,7 @@ public class TxRollbackHandler implements StateAwareMethodListener<TxRollbackBod //Now resend all the unacknowledged messages back to the original subscribers. //(Must be done after the TxnRollback-ok response). // Why, are we not allowed to send messages back to client before the ok method? - channel.resend(session, false); + channel.resend(false); } catch (AMQException e) { diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java index d71f6e3046..133f4809b4 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java @@ -523,8 +523,8 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, { try { - markChannelawaitingCloseOk(channelId); channel.close(this); + markChannelawaitingCloseOk(channelId); } finally { @@ -546,7 +546,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, /** * In our current implementation this is used by the clustering code. * - * @param channelId + * @param channelId The channel to remove */ public void removeChannel(int channelId) { diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java index dedea68d18..6d375c89fe 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java @@ -85,11 +85,18 @@ public class AMQMessage private Subscription _takenBySubcription; + private Set<Subscription> _rejectedBy = null; + public boolean isTaken() { return _taken.get(); } + public String debugIdentity() + { + return "(HC:" + System.identityHashCode(this) + " ID:" + _messageId + ")"; + } + /** * Used to iterate through all the body frames associated with this message. Will not keep all the data in memory * therefore is memory-efficient. @@ -199,7 +206,7 @@ public class AMQMessage _taken = new AtomicBoolean(false); if (_log.isDebugEnabled()) { - _log.debug("Message created with id " + messageId); + _log.debug("Message(" + System.identityHashCode(this) + ") created with id " + messageId); } } @@ -452,7 +459,9 @@ public class AMQMessage public void release() { + _log.trace("Releasing Message:" + debugIdentity()); _taken.set(false); + _takenBySubcription = null; } public boolean checkToken(Object token) @@ -511,7 +520,7 @@ public class AMQMessage * @throws NoConsumersException if the message is marked for immediate delivery but has not been marked as delivered * to a consumer */ - public void checkDeliveredToConsumer() throws NoConsumersException, AMQException + public void checkDeliveredToConsumer() throws NoConsumersException { if (_immediate && !_deliveredToConsumer) @@ -580,7 +589,8 @@ public class AMQMessage for (AMQQueue q : destinationQueues) { - _txnContext.deliver(this, q, true); + //normal deliver so add this message at the end. + _txnContext.deliver(this, q, false); } } finally @@ -801,7 +811,7 @@ public class AMQMessage public String toString() { - return "Message: " + _messageId + "; ref count: " + _referenceCount + "; taken: " + + return "Message[" + debugIdentity() + "]: " + _messageId + "; ref count: " + _referenceCount + "; taken: " + _taken + " by:" + _takenBySubcription; } @@ -809,4 +819,35 @@ public class AMQMessage { return _takenBySubcription; } + + public void reject(Subscription subscription) + { + if (subscription != null) + { + if (_rejectedBy == null) + { + _rejectedBy = new HashSet<Subscription>(); + } + + _rejectedBy.add(subscription); + } + else + { + _log.warn("Requesting rejection by null subscriber:" + debugIdentity()); + } + } + + public boolean isRejectedBy(Subscription subscription) + { + boolean rejected = _rejectedBy != null; + + if (rejected) // We have subscriptions that rejected this message + { + return _rejectedBy.contains(subscription); + } + else // This messasge hasn't been rejected yet. + { + return rejected; + } + } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java index 5bbe1671a7..7c2fe73386 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java @@ -50,6 +50,7 @@ import org.apache.qpid.server.virtualhost.VirtualHost; */ public class AMQQueue implements Managable, Comparable { + public static final class ExistingExclusiveSubscription extends AMQException { @@ -446,7 +447,11 @@ public class AMQQueue implements Managable, Comparable setExclusive(true); } - debug("Registering protocol session {0} with channel {1} and consumer tag {2} with {3}", ps, channel, consumerTag, this); + if (_logger.isDebugEnabled()) + { + _logger.debug(MessageFormat.format("Registering protocol session {0} with channel {1} and " + + "consumer tag {2} with {3}", ps, channel, consumerTag, this)); + } Subscription subscription = _subscriptionFactory.createSubscription(channel, ps, consumerTag, acks, filters, noLocal, this); @@ -486,8 +491,11 @@ public class AMQQueue implements Managable, Comparable public void unregisterProtocolSession(AMQProtocolSession ps, int channel, AMQShortString consumerTag) throws AMQException { - debug("Unregistering protocol session {0} with channel {1} and consumer tag {2} from {3}", ps, channel, consumerTag, - this); + if (_logger.isDebugEnabled()) + { + _logger.debug(MessageFormat.format("Unregistering protocol session {0} with channel {1} and consumer tag {2} from {3}", ps, channel, consumerTag, + this)); + } Subscription removedSubscription; if ((removedSubscription = _subscribers.removeSubscriber(_subscriptionFactory.createSubscription(channel, @@ -506,6 +514,10 @@ public class AMQQueue implements Managable, Comparable // if we are eligible for auto deletion, unregister from the queue registry if (_autoDelete && _subscribers.isEmpty()) { + if (_logger.isInfoEnabled()) + { + _logger.warn("Auto-deleteing queue:" + this); + } autodelete(); // we need to manually fire the event to the removed subscription (which was the last one left for this // queue. This is because the delete method uses the subscription set which has just been cleared @@ -561,14 +573,18 @@ public class AMQQueue implements Managable, Comparable protected void autodelete() throws AMQException { - debug("autodeleting {0}", this); + if (_logger.isDebugEnabled()) + { + _logger.debug(MessageFormat.format("autodeleting {0}", this)); + } delete(); } - public void processGet(StoreContext storeContext, AMQMessage msg) throws AMQException + public void processGet(StoreContext storeContext, AMQMessage msg, boolean deliverFirst) throws AMQException { //fixme not sure what this is doing. should we be passing deliverFirst through here? - _deliveryMgr.deliver(storeContext, getName(), msg, false); + // This code is not used so when it is perhaps it should + _deliveryMgr.deliver(storeContext, getName(), msg, deliverFirst); try { msg.checkDeliveredToConsumer(); @@ -582,6 +598,10 @@ public class AMQQueue implements Managable, Comparable } } +// public DeliveryManager getDeliveryManager() +// { +// return _deliveryMgr; +// } public void process(StoreContext storeContext, AMQMessage msg, boolean deliverFirst) throws AMQException { @@ -673,14 +693,6 @@ public class AMQQueue implements Managable, Comparable return "Queue(" + _name + ")@" + System.identityHashCode(this); } - private void debug(String msg, Object... args) - { - if (_logger.isDebugEnabled()) - { - _logger.debug(MessageFormat.format(msg, args)); - } - } - public boolean performGet(AMQProtocolSession session, AMQChannel channel, boolean acks) throws AMQException { return _deliveryMgr.performGet(session, channel, acks); diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java index e70926736d..601effcec7 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java @@ -45,7 +45,6 @@ import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.util.MessageQueue; import org.apache.qpid.util.ConcurrentLinkedMessageQueueAtomicSize; -import org.apache.qpid.util.ConcurrentLinkedQueueAtomicSize; /** Manages delivery of messages on behalf of a queue */ @@ -86,6 +85,8 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager private AtomicLong _totalMessageSize = new AtomicLong(); private AtomicInteger _extraMessages = new AtomicInteger(); private Set<Subscription> _hasContent = Collections.synchronizedSet(new HashSet<Subscription>()); + private final Object _queueHeadLock = new Object(); + private String _processingThreadName = ""; ConcurrentSelectorDeliveryManager(SubscriptionManager subscriptions, AMQQueue queue) { @@ -118,7 +119,10 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager if (deliverFirst) { - _messages.pushHead(msg); + synchronized (_queueHeadLock) + { + _messages.pushHead(msg); + } } else { @@ -367,16 +371,19 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager long count = 0; _lock.lock(); - AMQMessage msg = getNextMessage(); - while (msg != null) + synchronized (_queueHeadLock) { - //mark this message as taken and get it removed - msg.taken(null); - _queue.dequeue(storeContext, msg); - msg = getNextMessage(); - count++; - } + AMQMessage msg = getNextMessage(); + while (msg != null) + { + //and remove it + _messages.poll(); + _queue.dequeue(storeContext, msg); + msg = getNextMessage(); + count++; + } + } _lock.unlock(); return count; } @@ -390,12 +397,20 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager { AMQMessage message = messages.peek(); - - while (message != null && ((sub == null || sub.isBrowser()) || message.taken(sub))) + while (message != null && ((sub != null && sub.isBrowser()) || message.taken(sub))) { //remove the already taken message - messages.poll(); + AMQMessage removed = messages.poll(); + + assert removed == message; + _totalMessageSize.addAndGet(-message.getSize()); + + if (_log.isTraceEnabled()) + { + _log.trace("Removed taken message:" + message.debugIdentity()); + } + // try the next message message = messages.peek(); } @@ -409,7 +424,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager if (_log.isTraceEnabled()) { - _log.trace("Async sendNextMessage for sub (" + System.identityHashCode(sub) + + _log.trace(debugIdentity() + "Async sendNextMessage for sub (" + System.identityHashCode(sub) + ") from queue (" + System.identityHashCode(messageQueue) + ") AMQQueue (" + System.identityHashCode(queue) + ")"); } @@ -417,46 +432,63 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager if (messageQueue == null) { // There is no queue with messages currently. This is ok... just means the queue has no msgs matching selector - if (_log.isDebugEnabled()) + if (_log.isInfoEnabled()) { - _log.debug(sub + ": asked to send messages but has none on given queue:" + queue); + _log.info(debugIdentity() + sub + ": asked to send messages but has none on given queue:" + queue); } return; } AMQMessage message = null; + AMQMessage removed = null; try { - message = getNextMessage(messageQueue, sub); - - // message will be null if we have no messages in the messageQueue. - if (message == null) + synchronized (_queueHeadLock) { - if (_log.isTraceEnabled()) + message = getNextMessage(messageQueue, sub); + + // message will be null if we have no messages in the messageQueue. + if (message == null) { - _log.trace("No messages for Subscriber(" + System.identityHashCode(sub) + ") from queue; (" + System.identityHashCode(messageQueue) + ")"); + if (_log.isTraceEnabled()) + { + _log.trace(debugIdentity() + "No messages for Subscriber(" + System.identityHashCode(sub) + ") from queue; (" + System.identityHashCode(messageQueue) + ")"); + } + return; + } + if (_log.isDebugEnabled()) + { + _log.debug(debugIdentity() + "Async Delivery Message " + message.getMessageId() + "(" + System.identityHashCode(message) + + ") by :" + System.identityHashCode(this) + + ") to :" + System.identityHashCode(sub)); } - return; + + sub.send(message, _queue); + + //remove sent message from our queue. + removed = messageQueue.poll(); + //If we don't remove the message from _messages + // Otherwise the Async send will never end } + + if (removed != message) + { + _log.error("Just send message:" + message.debugIdentity() + " BUT removed this from queue:" + removed); + } + if (_log.isDebugEnabled()) { - _log.debug("Async Delivery Message (" + System.identityHashCode(message) + + _log.debug(debugIdentity() + "Async Delivered Message r:" + removed.debugIdentity() + "d:" + message.debugIdentity() + ") by :" + System.identityHashCode(this) + ") to :" + System.identityHashCode(sub)); } - sub.send(message, _queue); - - //remove sent message from our queue. - messageQueue.poll(); - //If we don't remove the message from _messages - // Otherwise the Async send will never end if (messageQueue == sub.getResendQueue()) { if (_log.isTraceEnabled()) { - _log.trace("All messages sent from resendQueue for " + sub); + _log.trace(debugIdentity() + "All messages sent from resendQueue for " + sub); } if (messageQueue.isEmpty()) { @@ -469,7 +501,10 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager } else if (messageQueue == sub.getPreDeliveryQueue()) { - _log.info("We could do clean up of the main _message queue here"); + if (_log.isInfoEnabled()) + { + _log.info(debugIdentity() + "We could do clean up of the main _message queue here"); + } } _totalMessageSize.addAndGet(-message.getSize()); @@ -477,7 +512,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager catch (AMQException e) { message.release(); - _log.error("Unable to deliver message as dequeue failed: " + e, e); + _log.error(debugIdentity() + "Unable to deliver message as dequeue failed: " + e, e); } } @@ -516,6 +551,12 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager */ private void processQueue() { + //record thread name + if (_log.isDebugEnabled()) + { + _processingThreadName = Thread.currentThread().getName(); + } + // Continue to process delivery while we haveSubscribers and messages boolean hasSubscribers = _subscriptions.hasActiveSubscribers(); @@ -561,9 +602,10 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager { if (_log.isDebugEnabled()) { - _log.debug(id() + "deliver :first(" + deliverFirst + ") :" + msg); + _log.debug(debugIdentity() + "deliver :first(" + deliverFirst + ") :" + msg); } - msg.release(); + // This shouldn't be done here. +// msg.release(); //Check if we have someone to deliver the message to. _lock.lock(); @@ -575,7 +617,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager { if (_log.isDebugEnabled()) { - _log.debug(id() + "Testing Message(" + msg + ") for Queued Delivery:" + currentStatus()); + _log.debug(debugIdentity() + "Testing Message(" + msg + ") for Queued Delivery:" + currentStatus()); } if (!msg.getMessagePublishInfo().isImmediate()) { @@ -587,7 +629,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager //Pre Deliver to all subscriptions if (_log.isDebugEnabled()) { - _log.debug(id() + "We have " + _subscriptions.getSubscriptions().size() + + _log.debug(debugIdentity() + "We have " + _subscriptions.getSubscriptions().size() + " subscribers to give the message to:" + currentStatus()); } for (Subscription sub : _subscriptions.getSubscriptions()) @@ -598,7 +640,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager { if (_log.isDebugEnabled()) { - _log.debug(id() + "Stopping PreDelivery as message(" + System.identityHashCode(msg) + + _log.debug(debugIdentity() + "Stopping PreDelivery as message(" + System.identityHashCode(msg) + ") is already delivered."); } continue; @@ -609,7 +651,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager { if (_log.isDebugEnabled()) { - _log.debug(id() + "Queuing message(" + System.identityHashCode(msg) + + _log.debug(debugIdentity() + "Queuing message(" + System.identityHashCode(msg) + ") for PreDelivery for subscriber(" + System.identityHashCode(sub) + ")"); } sub.enqueueForPreDelivery(msg, deliverFirst); @@ -625,9 +667,9 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager { if (!s.isSuspended()) { - if (_log.isDebugEnabled()) + if (_log.isTraceEnabled()) { - _log.debug(id() + "Delivering Message:" + System.identityHashCode(msg) + " to(" + + _log.trace(debugIdentity() + "Delivering Message:" + msg.debugIdentity() + " to(" + System.identityHashCode(s) + ") :" + s); } msg.taken(s); @@ -636,33 +678,35 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager } else { - if (_log.isDebugEnabled()) + if (_log.isInfoEnabled()) { - _log.debug(id() + " Subscription(" + System.identityHashCode(s) + ") became suspended between nextSubscriber and send"); + _log.info(debugIdentity() + " Subscription(" + System.identityHashCode(s) + ") became " + + "suspended between nextSubscriber and send for message:" + msg.debugIdentity()); } } + } - if (!msg.isTaken()) + if (!msg.isTaken()) + { + if (_log.isInfoEnabled()) { - if (_log.isDebugEnabled()) - { - _log.debug(id() + " Message(" + System.identityHashCode(msg) + ") has not been taken so recursing!:" + - " Subscriber:" + System.identityHashCode(s)); - } - - deliver(context, name, msg, deliverFirst); + _log.info(debugIdentity() + " Message(" + msg.debugIdentity() + ") has not been taken so recursing!:" + + " Subscriber:" + System.identityHashCode(s)); } - else + + deliver(context, name, msg, deliverFirst); + } + else + { + if (_log.isDebugEnabled()) { - if (_log.isDebugEnabled()) - { - _log.debug(id() + " Message(" + System.identityHashCode(msg) + - ") has been taken so disregarding deliver request to Subscriber:" + - System.identityHashCode(s)); - } + _log.debug(debugIdentity() + " Message(" + msg.debugIdentity() + + ") has been taken so disregarding deliver request to Subscriber:" + + System.identityHashCode(s)); } } } + } finally { @@ -674,10 +718,9 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager } } - //fixme remove private final String id = "(" + String.valueOf(System.identityHashCode(this)) + ")"; - private String id() + private String debugIdentity() { return id; } @@ -710,7 +753,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager { if (_log.isDebugEnabled()) { - _log.debug("Processing Async." + currentStatus()); + _log.debug(debugIdentity() + "Processing Async." + currentStatus()); } if (hasQueuedMessages() && _subscriptions.hasActiveSubscribers()) @@ -725,14 +768,12 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager private String currentStatus() { - return " Queued:" + (_messages.isEmpty() ? "Empty " : "Contains") + - "(" + _messages.size() + ":" + ((ConcurrentLinkedQueue) _messages).size() + ") " + + return " Queued:" + (_messages.isEmpty() ? "Empty " : "Contains(M:H)") + + "(" + _messages.size() + ":" + ((ConcurrentLinkedMessageQueueAtomicSize) _messages).headSize() + ") " + " Extra: " + (_hasContent.isEmpty() ? "Empty " : "Contains") + "(" + _hasContent.size() + ":" + _extraMessages.get() + ") " + " Active:" + _subscriptions.hasActiveSubscribers() + - " Processing:" + _processing.get() + - " Queued:" + (_messages.isEmpty() ? "Empty " : "Contains") + - "(" + _messages.size() + ":" + ((ConcurrentLinkedQueue) _messages).size() + ") "; + " Processing:" + (_processing.get() ? " true : Processing Thread: " + _processingThreadName : " false"); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java index 0a2e73880c..20033daac7 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java @@ -46,6 +46,8 @@ import org.apache.qpid.util.ConcurrentLinkedMessageQueueAtomicSize; */ public class SubscriptionImpl implements Subscription { + + private static final Logger _suspensionlogger = Logger.getLogger("Suspension"); private static final Logger _logger = Logger.getLogger(SubscriptionImpl.class); public final AMQChannel channel; @@ -258,6 +260,12 @@ public class SubscriptionImpl implements Subscription { channel.addUnacknowledgedBrowsedMessage(msg, deliveryTag, consumerTag, queue); } + + if (_sendLock.get()) + { + _logger.error("Sending " + msg + " when subscriber(" + this + ") is closed!"); + } + protocolSession.getProtocolOutputConverter().writeDeliver(msg, channel.getChannelId(), deliveryTag, consumerTag); } } @@ -265,56 +273,56 @@ public class SubscriptionImpl implements Subscription private void sendToConsumer(StoreContext storeContext, AMQMessage msg, AMQQueue queue) throws AMQException { - try - { - // if we do not need to wait for client acknowledgements - // we can decrement the reference count immediately. + // if we do not need to wait for client acknowledgements + // we can decrement the reference count immediately. - // By doing this _before_ the send we ensure that it - // doesn't get sent if it can't be dequeued, preventing - // duplicate delivery on recovery. + // By doing this _before_ the send we ensure that it + // doesn't get sent if it can't be dequeued, preventing + // duplicate delivery on recovery. - // The send may of course still fail, in which case, as - // the message is unacked, it will be lost. - if (!_acks) + // The send may of course still fail, in which case, as + // the message is unacked, it will be lost. + if (!_acks) + { + if (_logger.isDebugEnabled()) { - if (_logger.isDebugEnabled()) - { - _logger.debug("No ack mode so dequeuing message immediately: " + msg.getMessageId()); - } - queue.dequeue(storeContext, msg); + _logger.debug("No ack mode so dequeuing message immediately: " + msg.getMessageId()); } - synchronized (channel) - { - long deliveryTag = channel.getNextDeliveryTag(); - - if (_acks) - { - channel.addUnacknowledgedMessage(msg, deliveryTag, consumerTag, queue); - msg.decrementReference(storeContext); - } + queue.dequeue(storeContext, msg); + } + synchronized (channel) + { + long deliveryTag = channel.getNextDeliveryTag(); - protocolSession.getProtocolOutputConverter().writeDeliver(msg, channel.getChannelId(), deliveryTag, consumerTag); + if (_sendLock.get()) + { + _logger.error("Sending " + msg + " when subscriber(" + this + ") is closed!"); + } + if (_acks) + { + channel.addUnacknowledgedMessage(msg, deliveryTag, consumerTag, queue); + msg.decrementReference(storeContext); } - } - finally - { + + protocolSession.getProtocolOutputConverter().writeDeliver(msg, channel.getChannelId(), deliveryTag, consumerTag); + //Only set delivered if it actually was writen successfully.. + // using a try->finally would set it even if an error occured. msg.setDeliveredToConsumer(); } } public boolean isSuspended() { - if (_logger.isTraceEnabled()) + if (_suspensionlogger.isInfoEnabled()) { if (channel.isSuspended()) { - _logger.trace("Subscription(" + System.identityHashCode(this) + ") channel's is susupended"); + _suspensionlogger.info("Subscription(" + debugIdentity() + ") channel's is susupended"); } if (_sendLock.get()) { - _logger.trace("Subscription(" + System.identityHashCode(this) + ") has sendLock set so closing."); + _suspensionlogger.info("Subscription(" + debugIdentity() + ") has sendLock set so closing."); } } return channel.isSuspended() || _sendLock.get(); @@ -323,7 +331,7 @@ public class SubscriptionImpl implements Subscription /** * Callback indicating that a queue has been deleted. * - * @param queue + * @param queue The queue to delete */ public void queueDeleted(AMQQueue queue) throws AMQException { @@ -337,9 +345,18 @@ public class SubscriptionImpl implements Subscription public boolean hasInterest(AMQMessage msg) { + //check that the message hasn't been rejected + if (msg.isRejectedBy(this)) + { + if (_logger.isDebugEnabled()) + { + _logger.debug("Subscription:" + debugIdentity() + " rejected message:" + msg.debugIdentity()); + } +// return false; + } + if (_noLocal) { - boolean isLocal; // We don't want local messages so check to see if message is one we sent Object localInstance; Object msgInstance; @@ -350,12 +367,12 @@ public class SubscriptionImpl implements Subscription if ((msg.getPublisher().getClientProperties() != null) && (msgInstance = msg.getPublisher().getClientProperties().getObject(CLIENT_PROPERTIES_INSTANCE)) != null) { - if (localInstance == msgInstance || ((localInstance != null) && localInstance.equals(msgInstance))) + if (localInstance == msgInstance || localInstance.equals(msgInstance)) { if (_logger.isTraceEnabled()) { - _logger.trace("(" + System.identityHashCode(this) + ") has no interest as it is a local message(" + - System.identityHashCode(msg) + ")"); + _logger.trace("(" + debugIdentity() + ") has no interest as it is a local message(" + + msg.debugIdentity() + ")"); } return false; } @@ -369,8 +386,8 @@ public class SubscriptionImpl implements Subscription { if (_logger.isTraceEnabled()) { - _logger.trace("(" + System.identityHashCode(this) + ") has no interest as it is a local message(" + - System.identityHashCode(msg) + ")"); + _logger.trace("(" + debugIdentity() + ") has no interest as it is a local message(" + + msg.debugIdentity() + ")"); } return false; } @@ -383,19 +400,26 @@ public class SubscriptionImpl implements Subscription if (_logger.isTraceEnabled()) { - _logger.trace("(" + System.identityHashCode(this) + ") checking filters for message (" + System.identityHashCode(msg)); + _logger.trace("(" + debugIdentity() + ") checking filters for message (" + msg.debugIdentity()); } return checkFilters(msg); } + private String id = String.valueOf(System.identityHashCode(this)); + + private String debugIdentity() + { + return id; + } + private boolean checkFilters(AMQMessage msg) { if (_filters != null) { if (_logger.isTraceEnabled()) { - _logger.trace("(" + System.identityHashCode(this) + ") has filters."); + _logger.trace("(" + debugIdentity() + ") has filters."); } return _filters.allAllow(msg); } @@ -403,7 +427,7 @@ public class SubscriptionImpl implements Subscription { if (_logger.isTraceEnabled()) { - _logger.trace("(" + System.identityHashCode(this) + ") has no filters"); + _logger.trace("(" + debugIdentity() + ") has no filters"); } return true; @@ -445,15 +469,19 @@ public class SubscriptionImpl implements Subscription } _sendLock.set(true); - } + if (_logger.isInfoEnabled()) { - _logger.info("Closing subscription (" + System.identityHashCode(this) + "):" + this); + _logger.info("Closing subscription (" + debugIdentity() + "):" + this); } if (_resendQueue != null && !_resendQueue.isEmpty()) { + if (_logger.isInfoEnabled()) + { + _logger.info("Requeuing closing subscription (" + debugIdentity() + "):" + this); + } requeue(); } @@ -486,6 +514,11 @@ public class SubscriptionImpl implements Subscription { AMQMessage resent = _resendQueue.poll(); + if (_logger.isTraceEnabled()) + { + _logger.trace("Removed for resending:" + resent.debugIdentity()); + } + resent.release(); _queue.subscriberHasPendingResend(false, this, resent); @@ -495,7 +528,7 @@ public class SubscriptionImpl implements Subscription } catch (AMQException e) { - _logger.error("Unable to re-deliver messages", e); + _logger.error("MESSAGE LOSS : Unable to re-deliver messages", e); } } diff --git a/java/client/pom.xml b/java/client/pom.xml index 617390c059..abac5b3f1a 100644 --- a/java/client/pom.xml +++ b/java/client/pom.xml @@ -55,6 +55,13 @@ <dependency> <groupId>commons-collections</groupId> <artifactId>commons-collections</artifactId> + <!-- commons collection exports log4j v1.2.7 which doesn't have trace()--> + <exclusions> + <exclusion> + <groupId>log4j</groupId> + <artifactId>log4j</artifactId> + </exclusion> + </exclusions> </dependency> <dependency> diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 89f596e541..61143eee69 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -198,9 +198,10 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi private final Object _suspensionLock = new Object(); - /** Responsible for decoding a message fragment and passing it to the appropriate message consumer. */ + private static final Logger _dispatcherLogger = Logger.getLogger(Dispatcher.class); + private class Dispatcher extends Thread { @@ -212,12 +213,37 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public Dispatcher() { super("Dispatcher-Channel-" + _channelId); + if (_dispatcherLogger.isInfoEnabled()) + { + _dispatcherLogger.info(getName() + " created"); + } } public void run() { + if (_dispatcherLogger.isInfoEnabled()) + { + _dispatcherLogger.info(getName() + " started"); + } + UnprocessedMessage message; + // Allow disptacher to start stopped + synchronized (_lock) + { + while (connectionStopped()) + { + try + { + _lock.wait(); + } + catch (InterruptedException e) + { + // ignore + } + } + } + try { while (!_closed.get() && (message = (UnprocessedMessage) _queue.take()) != null) @@ -243,10 +269,12 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } catch (InterruptedException e) { - ; + //ignore + } + if (_dispatcherLogger.isInfoEnabled()) + { + _dispatcherLogger.info(getName() + " thread terminating for channel " + _channelId); } - - _logger.info("Dispatcher thread terminating for channel " + _channelId); } // only call while holding lock @@ -263,6 +291,12 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi currently = _connectionStopped; _connectionStopped = connectionStopped; _lock.notify(); + + if (_dispatcherLogger.isDebugEnabled()) + { + _dispatcherLogger.debug("Dispatcher Connection " + (connectionStopped ? "Started" : "Stopped") + + ": Currently " + (currently ? "Started" : "Stopped")); + } } return currently; } @@ -275,9 +309,14 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi if (consumer == null) { - _logger.warn("Received a message from queue " + message.getDeliverBody().consumerTag + " without a handler - ignoring..."); - _logger.warn("Consumers that exist: " + _consumers); - _logger.warn("Session hashcode: " + System.identityHashCode(this)); + if (_dispatcherLogger.isInfoEnabled()) + { + _dispatcherLogger.info("Received a message(" + System.identityHashCode(message) + ")" + + "[" + message.getDeliverBody().deliveryTag + "] from queue " + + message.getDeliverBody().consumerTag + " without a handler - rejecting(requeue)..."); + } + + rejectMessage(message, true); } else { @@ -311,7 +350,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi rejectAllMessages(true); - _logger.debug("Session Pre Dispatch Queue cleared"); + _dispatcherLogger.debug("Session Pre Dispatch Queue cleared"); for (BasicMessageConsumer consumer : _consumers.values()) { @@ -323,20 +362,28 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } - public void rejectPending(AMQShortString consumerTag) + public void rejectPending(BasicMessageConsumer consumer) { synchronized (_lock) { - boolean stopped = connectionStopped(); + boolean stopped = _dispatcher.connectionStopped(); - _dispatcher.setConnectionStopped(false); - - rejectMessagesForConsumerTag(consumerTag, true); - - if (stopped) + if (!stopped) { - _dispatcher.setConnectionStopped(stopped); + _dispatcher.setConnectionStopped(true); } + + // Reject messages on pre-receive queue + consumer.rollback(); + + // Reject messages on pre-dispatch queue + rejectMessagesForConsumerTag(consumer.getConsumerTag(), true); + + // Remove consumer from map. + deregisterConsumer(consumer); + + _dispatcher.setConnectionStopped(stopped); + } } } @@ -549,14 +596,15 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi suspendChannel(true); } - _connection.getProtocolHandler().syncWrite( - TxRollbackBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion()), TxRollbackOkBody.class); - if (_dispatcher != null) { _dispatcher.rollback(); } + _connection.getProtocolHandler().syncWrite( + TxRollbackBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion()), TxRollbackOkBody.class); + + if (!isSuspended) { suspendChannel(false); @@ -663,14 +711,10 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi jmse = e; } } - finally + if (jmse != null) { - if (jmse != null) - { - throw jmse; - } + throw jmse; } - } @@ -835,6 +879,11 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi consumer.clearUnackedMessages(); } + if (_dispatcher != null) + { + _dispatcher.rollback(); + } + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. @@ -844,11 +893,6 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi false) // requeue , BasicRecoverOkBody.class); - if (_dispatcher != null) - { - _dispatcher.rollback(); - } - if (!isSuspended) { suspendChannel(false); @@ -1223,35 +1267,17 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi return (counter != null) && (counter.get() != 0); } - - public void declareExchange(AMQShortString name, AMQShortString type) throws AMQException + public void declareExchange(AMQShortString name, AMQShortString type, boolean nowait) throws AMQException { - declareExchange(name, type, getProtocolHandler()); + declareExchange(name, type, getProtocolHandler(), nowait); } - public void declareExchangeSynch(AMQShortString name, AMQShortString type) throws AMQException + private void declareExchange(AMQDestination amqd, AMQProtocolHandler protocolHandler, boolean nowait) throws AMQException { - // TODO: Be aware of possible changes to parameter order as versions change. - AMQFrame frame = ExchangeDeclareBody.createAMQFrame(_channelId, - getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor) - null, // arguments - false, // autoDelete - false, // durable - name, // exchange - false, // internal - false, // nowait - false, // passive - getTicket(), // ticket - type); // type - getProtocolHandler().syncWrite(frame, ExchangeDeclareOkBody.class); + declareExchange(amqd.getExchangeName(), amqd.getExchangeClass(), protocolHandler, nowait); } - private void declareExchange(AMQDestination amqd, AMQProtocolHandler protocolHandler) throws AMQException - { - declareExchange(amqd.getExchangeName(), amqd.getExchangeClass(), protocolHandler); - } - - private void declareExchange(AMQShortString name, AMQShortString type, AMQProtocolHandler protocolHandler) throws AMQException + private void declareExchange(AMQShortString name, AMQShortString type, AMQProtocolHandler protocolHandler, boolean nowait) throws AMQException { // TODO: Be aware of possible changes to parameter order as versions change. AMQFrame exchangeDeclare = ExchangeDeclareBody.createAMQFrame(_channelId, @@ -1261,7 +1287,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi false, // durable name, // exchange false, // internal - false, // nowait + nowait, // nowait false, // passive getTicket(), // ticket type); // type @@ -1874,15 +1900,21 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi synchronized void startDistpatcherIfNecessary() { + startDistpatcherIfNecessary(false); + } + + synchronized void startDistpatcherIfNecessary(boolean initiallyStopped) + { if (_dispatcher == null) { _dispatcher = new Dispatcher(); _dispatcher.setDaemon(true); + _dispatcher.setConnectionStopped(initiallyStopped); _dispatcher.start(); } else { - _dispatcher.setConnectionStopped(false); + _dispatcher.setConnectionStopped(initiallyStopped); } } @@ -1910,7 +1942,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi AMQProtocolHandler protocolHandler = getProtocolHandler(); - declareExchange(amqd, protocolHandler); + declareExchange(amqd, protocolHandler, false); AMQShortString queueName = declareQueue(amqd, protocolHandler); @@ -1950,12 +1982,6 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi _destinationConsumerCount.remove(dest); } } - - //ensure we remove the messages from the consumer even if the dispatcher hasn't started - if (_dispatcher == null) - { - rejectMessagesForConsumerTag(consumer.getConsumerTag(), true); - }// if the dispatcher is running we have to do the clean up in the Ok Handler. } } @@ -2033,6 +2059,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public void confirmConsumerCancelled(AMQShortString consumerTag) { + + // Remove the consumer from the map BasicMessageConsumer consumer = (BasicMessageConsumer) _consumers.get(consumerTag); if (consumer != null) { @@ -2040,26 +2068,33 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi { consumer.closeWhenNoMessages(true); } + + //Clean the Maps up first + //Flush any pending messages for this consumerTag + if (_dispatcher != null) + { + _logger.info("Dispatcher is not null"); + } else { - consumer.rollback(); + _logger.info("Dispatcher is null so created stopped dispatcher"); + + startDistpatcherIfNecessary(true); } - } - //Flush any pending messages for this consumerTag - if (_dispatcher != null) - { - _dispatcher.rejectPending(consumerTag); + _dispatcher.rejectPending(consumer); } else { - rejectMessagesForConsumerTag(consumerTag, true); + _logger.warn("Unable to confirm cancellation of consumer (" + consumerTag + "). Not found in consumer map."); } + + } /* - * I could have combined the last 3 methods, but this way it improves readability - */ + * I could have combined the last 3 methods, but this way it improves readability + */ private AMQTopic checkValidTopic(Topic topic) throws JMSException { if (topic == null) @@ -2189,16 +2224,20 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi if (consumerTag == null || message.getDeliverBody().consumerTag.equals(consumerTag)) { - if (_logger.isTraceEnabled()) + if (_logger.isDebugEnabled()) { - _logger.trace("Removing message from _queue:" + message); + _logger.debug("Removing message(" + System.identityHashCode(message) + + ") from _queue DT:" + message.getDeliverBody().deliveryTag); } messages.remove(); - rejectMessage(message.getDeliverBody().deliveryTag, requeue); + rejectMessage(message, requeue); - _logger.debug("Rejected the message(" + message.getDeliverBody() + ") for consumer :" + consumerTag); + if (_logger.isDebugEnabled()) + { + _logger.debug("Rejected the message(" + message.getDeliverBody() + ") for consumer :" + consumerTag); + } } else { @@ -2207,15 +2246,45 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } } + + public void rejectMessage(UnprocessedMessage message, boolean requeue) + { + + if (_logger.isDebugEnabled()) + { + _logger.debug("Rejecting Unacked message:" + message.getDeliverBody().deliveryTag); + } + + rejectMessage(message.getDeliverBody().deliveryTag, requeue); + } + + public void rejectMessage(AbstractJMSMessage message, boolean requeue) + { + if (_logger.isDebugEnabled()) + { + _logger.debug("Rejecting Abstract message:" + message.getDeliveryTag()); + } + rejectMessage(message.getDeliveryTag(), requeue); + + } + public void rejectMessage(long deliveryTag, boolean requeue) { - AMQFrame basicRejectBody = BasicRejectBody.createAMQFrame(_channelId, - getProtocolMajorVersion(), - getProtocolMinorVersion(), - deliveryTag, - requeue); + if (_acknowledgeMode == CLIENT_ACKNOWLEDGE || + _acknowledgeMode == SESSION_TRANSACTED) + { + if (_logger.isDebugEnabled()) + { + _logger.debug("Rejecting delivery tag:" + deliveryTag); + } + AMQFrame basicRejectBody = BasicRejectBody.createAMQFrame(_channelId, + getProtocolMajorVersion(), + getProtocolMinorVersion(), + deliveryTag, + requeue); - _connection.getProtocolHandler().writeFrame(basicRejectBody); + _connection.getProtocolHandler().writeFrame(basicRejectBody); + } } } diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index e9b914425a..9043faa80c 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -21,6 +21,7 @@ package org.apache.qpid.client; import java.util.Iterator; +import java.util.List; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.TimeUnit; @@ -109,9 +110,6 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer /** Number of messages unacknowledged in DUPS_OK_ACKNOWLEDGE mode */ private int _outstanding; - /** Tag of last message delievered, whoch should be acknowledged on commit in transaction mode. */ - private long _lastDeliveryTag; - /** * Switch to enable sending of acknowledgements when using DUPS_OK_ACKNOWLEDGE mode. Enabled when _outstannding * number of msgs >= _prefetchHigh and disabled at < _prefetchLow @@ -120,6 +118,9 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer private ConcurrentLinkedQueue<Long> _unacknowledgedDeliveryTags = new ConcurrentLinkedQueue<Long>(); + /** List of tags delievered, The last of which which should be acknowledged on commit in transaction mode. */ + private ConcurrentLinkedQueue<Long> _receivedDeliveryTags = new ConcurrentLinkedQueue<Long>(); + /** * The thread that was used to call receive(). This is important for being able to interrupt that thread if a * receive() is in progress. @@ -432,6 +433,11 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer public void close(boolean sendClose) throws JMSException { + if (_logger.isInfoEnabled()) + { + _logger.info("Closing consumer:" + debugIdentity()); + } + synchronized (_connection.getFailoverMutex()) { if (!_closed.getAndSet(true)) @@ -448,6 +454,12 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer try { _protocolHandler.syncWrite(cancelFrame, BasicCancelOkBody.class); + + if (_logger.isDebugEnabled()) + { + _logger.debug("CancelOk'd for consumer:" + debugIdentity()); + } + } catch (AMQException e) { @@ -456,11 +468,14 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer } } - deregisterConsumer(); - _unacknowledgedDeliveryTags.clear(); + //done in BasicCancelOK Handler + //deregisterConsumer(); if (_messageListener != null && _receiving.get()) { - _logger.info("Interrupting thread: " + _receivingThread); + if (_logger.isInfoEnabled()) + { + _logger.info("Interrupting thread: " + _receivingThread); + } _receivingThread.interrupt(); } } @@ -616,7 +631,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer } else { - _lastDeliveryTag = msg.getDeliveryTag(); + _receivedDeliveryTags.add(msg.getDeliveryTag()); } break; } @@ -625,10 +640,16 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer /** Acknowledge up to last message delivered (if any). Used when commiting. */ void acknowledgeLastDelivered() { - if (_lastDeliveryTag > 0) + if (!_receivedDeliveryTags.isEmpty()) { - _session.acknowledgeMessage(_lastDeliveryTag, true); - _lastDeliveryTag = -1; + long lastDeliveryTag = _receivedDeliveryTags.poll(); + + while (!_receivedDeliveryTags.isEmpty()) + { + lastDeliveryTag = _receivedDeliveryTags.poll(); + } + + _session.acknowledgeMessage(lastDeliveryTag, true); } } @@ -738,43 +759,76 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer public void rollback() { + clearUnackedMessages(); + + if (_logger.isDebugEnabled()) + { + _logger.debug("Rejecting received messages"); + } + //rollback received but not committed messages + while (!_receivedDeliveryTags.isEmpty()) + { + Long tag = _receivedDeliveryTags.poll(); + + if (tag != null) + { + if (_logger.isTraceEnabled()) + { + _logger.trace("Rejecting tag from _receivedDTs:" + tag); + } + + _session.rejectMessage(tag, true); + } + } + + //rollback pending messages if (_synchronousQueue.size() > 0) { if (_logger.isDebugEnabled()) { - _logger.debug("Rejecting the messages for consumer with tag:" + _consumerTag); + _logger.debug("Rejecting the messages(" + _synchronousQueue.size() + ")" + + "for consumer with tag:" + _consumerTag); } Iterator iterator = _synchronousQueue.iterator(); + while (iterator.hasNext()) { - Object o = iterator.next(); + Object o = iterator.next(); if (o instanceof AbstractJMSMessage) { - _session.rejectMessage(((AbstractJMSMessage) o).getDeliveryTag(), true); + _session.rejectMessage(((AbstractJMSMessage) o), true); if (_logger.isTraceEnabled()) { - _logger.trace("Rejected message" + o); - iterator.remove(); + _logger.trace("Rejected message:" + ((AbstractJMSMessage) o).getDeliveryTag()); } + iterator.remove(); } else { _logger.error("Queue contained a :" + o.getClass() + " unable to reject as it is not an AbstractJMSMessage. Will be cleared"); + iterator.remove(); } } if (_synchronousQueue.size() != 0) { _logger.warn("Queue was not empty after rejecting all messages Remaining:" + _synchronousQueue.size()); + rollback(); } _synchronousQueue.clear(); } } + + public String debugIdentity() + { + return String.valueOf(_consumerTag); + } + } diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/BasicCancelOkMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/BasicCancelOkMethodHandler.java index 9bd0205977..bd8177feb6 100644 --- a/java/client/src/main/java/org/apache/qpid/client/handler/BasicCancelOkMethodHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/handler/BasicCancelOkMethodHandler.java @@ -28,27 +28,29 @@ import org.apache.qpid.client.state.StateAwareMethodListener; import org.apache.qpid.framing.BasicCancelOkBody; import org.apache.qpid.protocol.AMQMethodEvent; -/** - * @author Apache Software Foundation - */ public class BasicCancelOkMethodHandler implements StateAwareMethodListener { - private static final Logger _logger = Logger.getLogger(BasicCancelOkMethodHandler.class); - private static final BasicCancelOkMethodHandler _instance = new BasicCancelOkMethodHandler(); + private static final Logger _logger = Logger.getLogger(BasicCancelOkMethodHandler.class); + private static final BasicCancelOkMethodHandler _instance = new BasicCancelOkMethodHandler(); + + public static BasicCancelOkMethodHandler getInstance() + { + return _instance; + } + + private BasicCancelOkMethodHandler() + { + } - public static BasicCancelOkMethodHandler getInstance() - { - return _instance; - } + public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt) throws AMQException + { + BasicCancelOkBody body = (BasicCancelOkBody) evt.getMethod(); - private BasicCancelOkMethodHandler() - { - } + if (_logger.isInfoEnabled()) + { + _logger.info("New BasicCancelOk method received for consumer:" + body.consumerTag); + } - public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt) throws AMQException - { - _logger.debug("New BasicCancelOk method received"); - BasicCancelOkBody body = (BasicCancelOkBody) evt.getMethod(); - protocolSession.confirmConsumerCancelled(evt.getChannelId(), body.consumerTag); - } + protocolSession.confirmConsumerCancelled(evt.getChannelId(), body.consumerTag); + } } diff --git a/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java index ddf79ec907..b176df87fe 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java @@ -30,13 +30,11 @@ import org.apache.qpid.framing.ContentBody; import org.apache.qpid.framing.ContentHeaderBody; /** - * This class contains everything needed to process a JMS message. It assembles the - * deliver body, the content header and the content body/ies. - * - * Note that the actual work of creating a JMS message for the client code's use is done - * outside of the MINA dispatcher thread in order to minimise the amount of work done in - * the MINA dispatcher thread. + * This class contains everything needed to process a JMS message. It assembles the deliver body, the content header and + * the content body/ies. * + * Note that the actual work of creating a JMS message for the client code's use is done outside of the MINA dispatcher + * thread in order to minimise the amount of work done in the MINA dispatcher thread. */ public class UnprocessedMessage { @@ -47,9 +45,7 @@ public class UnprocessedMessage private final int _channelId; private ContentHeaderBody _contentHeader; - /** - * List of ContentBody instances. Due to fragmentation you don't know how big this will be in general - */ + /** List of ContentBody instances. Due to fragmentation you don't know how big this will be in general */ private List<ContentBody> _bodies; public UnprocessedMessage(int channelId, BasicDeliverBody deliverBody) @@ -74,9 +70,9 @@ public class UnprocessedMessage { final long payloadSize = body.payload.remaining(); - if(_bodies == null) + if (_bodies == null) { - if(payloadSize == getContentHeader().bodySize) + if (payloadSize == getContentHeader().bodySize) { _bodies = Collections.singletonList(body); } diff --git a/java/client/src/main/java/org/apache/qpid/client/state/StateWaiter.java b/java/client/src/main/java/org/apache/qpid/client/state/StateWaiter.java index b2940d73ae..8a0b5e7d84 100644 --- a/java/client/src/main/java/org/apache/qpid/client/state/StateWaiter.java +++ b/java/client/src/main/java/org/apache/qpid/client/state/StateWaiter.java @@ -58,6 +58,7 @@ public class StateWaiter implements StateListener { _logger.debug("State " + _state + " not achieved so waiting..."); _monitor.wait(TIME_OUT); + //fixme this won't cause the timeout to exit the loop. need to set _throwable } catch (InterruptedException e) { diff --git a/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java b/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java index 03e7d399ce..cb4ef01d25 100644 --- a/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java +++ b/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java @@ -20,9 +20,6 @@ */ package org.apache.qpid.client.util; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.client.message.UnprocessedMessage; -import org.apache.log4j.Logger; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java index 338404a431..4667a2b3fa 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java @@ -73,7 +73,8 @@ public class RecoverTest extends TestCase Queue queue = new AMQQueue(consumerSession.getDefaultQueueExchangeName(),new AMQShortString("someQ"), new AMQShortString("someQ"), false, true); MessageConsumer consumer = consumerSession.createConsumer(queue); //force synch to ensure the consumer has resulted in a bound queue - ((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.DIRECT_EXCHANGE_NAME, ExchangeDefaults.DIRECT_EXCHANGE_CLASS); + //((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.DIRECT_EXCHANGE_NAME, ExchangeDefaults.DIRECT_EXCHANGE_CLASS); + // This is the default now AMQConnection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "test"); Session producerSession = con2.createSession(false, Session.CLIENT_ACKNOWLEDGE); @@ -130,7 +131,8 @@ public class RecoverTest extends TestCase Queue queue = new AMQQueue(consumerSession.getDefaultQueueExchangeName(), new AMQShortString("someQ"), new AMQShortString("someQ"), false, true); MessageConsumer consumer = consumerSession.createConsumer(queue); //force synch to ensure the consumer has resulted in a bound queue - ((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.DIRECT_EXCHANGE_NAME, ExchangeDefaults.DIRECT_EXCHANGE_CLASS); + //((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.DIRECT_EXCHANGE_NAME, ExchangeDefaults.DIRECT_EXCHANGE_CLASS); + // This is the default now AMQConnection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "test"); Session producerSession = con2.createSession(false, Session.CLIENT_ACKNOWLEDGE); diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseTest.java index 3431c56783..51bbe7d0e6 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseTest.java @@ -109,6 +109,10 @@ public class ChannelCloseTest extends TestCase implements ExceptionListener, Con } catch (AMQException e) { + if (_logger.isInfoEnabled()) + { + _logger.info("Exception occured was:" + e.getErrorCode()); + } assertEquals("Connection should be closed", AMQConstant.CHANNEL_ERROR, e.getErrorCode()); _connection = newConnection(); @@ -315,15 +319,15 @@ public class ChannelCloseTest extends TestCase implements ExceptionListener, Con } catch (JMSException e) { - fail("Creating new connection when:"+e.getMessage()); + fail("Creating new connection when:" + e.getMessage()); } catch (AMQException e) { - fail("Creating new connection when:"+e.getMessage()); + fail("Creating new connection when:" + e.getMessage()); } catch (URLSyntaxException e) { - fail("Creating new connection when:"+e.getMessage()); + fail("Creating new connection when:" + e.getMessage()); } diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java new file mode 100644 index 0000000000..a56bae3d70 --- /dev/null +++ b/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java @@ -0,0 +1,603 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.test.unit.close; + +import junit.framework.TestCase; + +import java.util.concurrent.atomic.AtomicInteger; + + +import javax.jms.ExceptionListener; +import javax.jms.Session; +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.Queue; +import javax.jms.MessageProducer; +import javax.jms.Message; +import javax.jms.TextMessage; +import javax.jms.MessageConsumer; + +import org.apache.qpid.client.AMQConnectionFactory; +import org.apache.qpid.client.AMQConnectionURL; +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.message.AbstractJMSMessage; +import org.apache.qpid.client.transport.TransportConnection; +import org.apache.qpid.url.URLSyntaxException; +import org.apache.qpid.AMQException; +import org.apache.log4j.Logger; +import org.apache.log4j.Level; + +public class MessageRequeueTest extends TestCase +{ + + private static final Logger _logger = Logger.getLogger(MessageRequeueTest.class); + + protected static AtomicInteger consumerIds = new AtomicInteger(0); + protected final Integer numTestMessages = 150; + + protected final int consumeTimeout = 3000; + + protected final String queue = "direct://amq.direct//queue"; + protected String payload = "Message:"; + + protected final String BROKER = "vm://:1"; + private boolean testReception = true; + + private long[] receieved = new long[numTestMessages + 1]; + private boolean passed=false; + + protected void setUp() throws Exception + { + super.setUp(); + TransportConnection.createVMBroker(1); + + QpidClientConnection conn = new QpidClientConnection(); + + conn.connect(); + // clear queue + conn.consume(queue, consumeTimeout); + // load test data + _logger.info("creating test data, " + numTestMessages + " messages"); + conn.put(queue, payload, numTestMessages); + // close this connection + conn.disconnect(); + } + + protected void tearDown() throws Exception + { + super.tearDown(); + + if (!passed) + { + QpidClientConnection conn = new QpidClientConnection(); + + conn.connect(); + // clear queue + conn.consume(queue, consumeTimeout); + } + TransportConnection.killVMBroker(1); + } + + /** multiple consumers */ + public void testDrain() throws JMSException, InterruptedException + { + QpidClientConnection conn = new QpidClientConnection(); + + conn.connect(); + + _logger.info("consuming queue " + queue); + Queue q = conn.getSession().createQueue(queue); + + final MessageConsumer consumer = conn.getSession().createConsumer(q); + int messagesReceived = 0; + + long messageLog[] = new long[numTestMessages + 1]; + + _logger.info("consuming..."); + Message msg = consumer.receive(1000); + while (msg != null) + { + messagesReceived++; + + long dt = ((AbstractJMSMessage) msg).getDeliveryTag(); + + int msgindex = msg.getIntProperty("index"); + if (messageLog[msgindex] != 0) + { + _logger.error("Received Message(" + msgindex + ":" + ((AbstractJMSMessage) msg).getDeliveryTag() + + ") more than once."); + } + + if (_logger.isInfoEnabled()) + { + _logger.info("Received Message(" + System.identityHashCode(msgindex) + ") " + + "DT:" + dt + + "IN:" + msgindex); + } + + if (dt == 0) + { + _logger.error("DT is zero for msg:" + msgindex); + } + + messageLog[msgindex] = dt; + + //get Next message + msg = consumer.receive(1000); + } + + conn.getSession().commit(); + consumer.close(); + assertEquals("number of consumed messages does not match initial data", (int) numTestMessages, messagesReceived); + + int index = 0; + StringBuilder list = new StringBuilder(); + list.append("Failed to receive:"); + int failed = 0; + + for (long b : messageLog) + { + if (b == 0 && index != 0) //delivery tag of zero shouldn't exist + { + _logger.error("Index: " + index + " was not received."); + list.append(" "); + list.append(index); + list.append(":"); + list.append(b); + failed++; + } + + index++; + } + assertEquals(list.toString(), 0, failed); + _logger.info("consumed: " + messagesReceived); + conn.disconnect(); + } + + /** multiple consumers */ + public void testTwoCompetingConsumers() + { + Consumer c1 = new Consumer(); + Consumer c2 = new Consumer(); + Consumer c3 = new Consumer(); + Consumer c4 = new Consumer(); + + Thread t1 = new Thread(c1); + Thread t2 = new Thread(c2); + Thread t3 = new Thread(c3); + Thread t4 = new Thread(c4); + + t1.start(); +// t2.start(); +// t3.start(); +// t4.start(); + + try + { + t1.join(); + t2.join(); + t3.join(); + t4.join(); + } + catch (InterruptedException e) + { + fail("Uanble to join to Consumer theads"); + } + + _logger.info("consumer 1 count is " + c1.getCount()); + _logger.info("consumer 2 count is " + c2.getCount()); + _logger.info("consumer 3 count is " + c3.getCount()); + _logger.info("consumer 4 count is " + c4.getCount()); + + Integer totalConsumed = c1.getCount() + c2.getCount() + c3.getCount() + c4.getCount(); + + // Check all messages were correctly delivered + int index = 0; + StringBuilder list = new StringBuilder(); + list.append("Failed to receive:"); + int failed = 0; + + for (long b : receieved) + { + if (b == 0 && index != 0) //delivery tag of zero shouldn't exist (and we don't have msg 0) + { + _logger.error("Index: " + index + " was not received."); + list.append(" "); + list.append(index); + list.append(":"); + list.append(b); + failed++; + } + index++; + } + assertEquals(list.toString() + "-" + numTestMessages + "-" + totalConsumed, 0, failed); + assertEquals("number of consumed messages does not match initial data", numTestMessages, totalConsumed); + passed=true; + } + + class Consumer implements Runnable + { + private Integer count = 0; + private Integer id; + + public Consumer() + { + id = consumerIds.addAndGet(1); + } + + public void run() + { + try + { + _logger.info("consumer-" + id + ": starting"); + QpidClientConnection conn = new QpidClientConnection(); + + conn.connect(); + + _logger.info("consumer-" + id + ": connected, consuming..."); + Message result; + do + { + result = conn.getNextMessage(queue, consumeTimeout); + if (result != null) + { + + long dt = ((AbstractJMSMessage) result).getDeliveryTag(); + + if (testReception) + { + int msgindex = result.getIntProperty("index"); + if (receieved[msgindex] != 0) + { + _logger.error("Received Message(" + msgindex + ":" + ((AbstractJMSMessage) result).getDeliveryTag() + + ") more than once."); + } + + if (_logger.isInfoEnabled()) + { + _logger.info("Received Message(" + System.identityHashCode(msgindex) + ") " + + "DT:" + dt + + "IN:" + msgindex); + } + + if (dt == 0) + { + _logger.error("DT is zero for msg:" + msgindex); + } + + receieved[msgindex] = dt; + } + + + count++; + if (count % 100 == 0) + { + _logger.info("consumer-" + id + ": got " + result + ", new count is " + count); + } + } + } + while (result != null); + + _logger.info("consumer-" + id + ": complete"); + conn.disconnect(); + + } + catch (Exception e) + { + e.printStackTrace(); + } + } + + public Integer getCount() + { + return count; + } + + public Integer getId() + { + return id; + } + } + + + public class QpidClientConnection implements ExceptionListener + { + private boolean transacted = true; + private int ackMode = Session.CLIENT_ACKNOWLEDGE; + private Connection connection; + + private String virtualHost; + private String brokerlist; + private int prefetch; + protected Session session; + protected boolean connected; + + public QpidClientConnection() + { + super(); + setVirtualHost("/test"); + setBrokerList(BROKER); + setPrefetch(5000); + } + + + public void connect() throws JMSException + { + if (!connected) + { + /* + * amqp://[user:pass@][clientid]/virtualhost? + * brokerlist='[transport://]host[:port][?option='value'[&option='value']];' + * [&failover='method[?option='value'[&option='value']]'] + * [&option='value']" + */ + String brokerUrl = "amqp://guest:guest@" + virtualHost + "?brokerlist='" + brokerlist + "'"; + try + { + AMQConnectionFactory factory = new AMQConnectionFactory(new AMQConnectionURL(brokerUrl)); + _logger.info("connecting to Qpid :" + brokerUrl); + connection = factory.createConnection(); + + // register exception listener + connection.setExceptionListener(this); + + session = ((AMQConnection) connection).createSession(transacted, ackMode, prefetch); + + + _logger.info("starting connection"); + connection.start(); + + connected = true; + } + catch (URLSyntaxException e) + { + throw new JMSException("URL syntax error in [" + brokerUrl + "]: " + e.getMessage()); + } + } + } + + public void disconnect() throws JMSException + { + if (connected) + { + session.commit(); + session.close(); + connection.close(); + connected = false; + _logger.info("disconnected"); + } + } + + public void disconnectWithoutCommit() throws JMSException + { + if (connected) + { + session.close(); + connection.close(); + connected = false; + _logger.info("disconnected without commit"); + } + } + + public String getBrokerList() + { + return brokerlist; + } + + public void setBrokerList(String brokerlist) + { + this.brokerlist = brokerlist; + } + + public String getVirtualHost() + { + return virtualHost; + } + + public void setVirtualHost(String virtualHost) + { + this.virtualHost = virtualHost; + } + + public void setPrefetch(int prefetch) + { + this.prefetch = prefetch; + } + + + /** override as necessary */ + public void onException(JMSException exception) + { + _logger.info("ExceptionListener event: error " + exception.getErrorCode() + ", message: " + exception.getMessage()); + } + + public boolean isConnected() + { + return connected; + } + + public Session getSession() + { + return session; + } + + /** + * Put a String as a text messages, repeat n times. A null payload will result in a null message. + * + * @param queueName The queue name to put to + * @param payload the content of the payload + * @param copies the number of messages to put + * + * @throws javax.jms.JMSException any exception that occurs + */ + public void put(String queueName, String payload, int copies) throws JMSException + { + if (!connected) + { + connect(); + } + + _logger.info("putting to queue " + queueName); + Queue queue = session.createQueue(queueName); + + final MessageProducer sender = session.createProducer(queue); + + for (int i = 0; i < copies; i++) + { + Message m = session.createTextMessage(payload + i); + m.setIntProperty("index", i + 1); + sender.send(m); + } + + session.commit(); + sender.close(); + _logger.info("put " + copies + " copies"); + } + + /** + * GET the top message on a queue. Consumes the message. Accepts timeout value. + * + * @param queueName The quename to get from + * @param readTimeout The timeout to use + * + * @return the content of the text message if any + * + * @throws javax.jms.JMSException any exception that occured + */ + public Message getNextMessage(String queueName, long readTimeout) throws JMSException + { + if (!connected) + { + connect(); + } + + Queue queue = session.createQueue(queueName); + + final MessageConsumer consumer = session.createConsumer(queue); + + Message message = consumer.receive(readTimeout); + session.commit(); + consumer.close(); + + Message result; + + // all messages we consume should be TextMessages + if (message instanceof TextMessage) + { + result = ((TextMessage) message); + } + else if (null == message) + { + result = null; + } + else + { + _logger.info("warning: received non-text message"); + result = message; + } + + return result; + } + + /** + * GET the top message on a queue. Consumes the message. + * + * @param queueName The Queuename to get from + * + * @return The string content of the text message, if any received + * + * @throws javax.jms.JMSException any exception that occurs + */ + public Message getNextMessage(String queueName) throws JMSException + { + return getNextMessage(queueName, 0); + } + + /** + * Completely clears a queue. For readTimeout behaviour see Javadocs for javax.jms.MessageConsumer. + * + * @param queueName The Queue name to consume from + * @param readTimeout The timeout for each consume + * + * @throws javax.jms.JMSException Any exception that occurs during the consume + * @throws InterruptedException If the consume thread was interrupted during a consume. + */ + public void consume(String queueName, int readTimeout) throws JMSException, InterruptedException + { + if (!connected) + { + connect(); + } + + _logger.info("consuming queue " + queueName); + Queue queue = session.createQueue(queueName); + + final MessageConsumer consumer = session.createConsumer(queue); + int messagesReceived = 0; + + _logger.info("consuming..."); + while ((consumer.receive(readTimeout)) != null) + { + messagesReceived++; + } + + session.commit(); + consumer.close(); + _logger.info("consumed: " + messagesReceived); + } + } + + + public void testRequeue() throws JMSException, AMQException, URLSyntaxException + { + String virtualHost = "/test"; + String brokerlist = "vm://:1"; + String brokerUrl = "amqp://guest:guest@" + virtualHost + "?brokerlist='" + brokerlist + "'"; + + Connection conn = new AMQConnection(brokerUrl); + Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue q = session.createQueue(queue); + + _logger.info("Create Consumer"); + MessageConsumer consumer = session.createConsumer(q); + + try + { + Thread.sleep(2000); + } + catch (InterruptedException e) + { + // + } + + _logger.info("Receiving msg"); + Message msg = consumer.receive(); + + assertNotNull("Message should not be null", msg); + + _logger.info("Close Consumer"); + consumer.close(); + + _logger.info("Close Connection"); + conn.close(); + } + +}
\ No newline at end of file diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/message/StreamMessageTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/message/StreamMessageTest.java index 07ef5f04d4..fb5ea58174 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/message/StreamMessageTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/message/StreamMessageTest.java @@ -80,7 +80,8 @@ public class StreamMessageTest extends TestCase //force synch to ensure the consumer has resulted in a bound queue - ((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.HEADERS_EXCHANGE_NAME, ExchangeDefaults.HEADERS_EXCHANGE_CLASS); + //((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.HEADERS_EXCHANGE_NAME, ExchangeDefaults.HEADERS_EXCHANGE_CLASS); + // This is the default now Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "test"); diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java index 0d75a6b968..2abc139ced 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java @@ -43,7 +43,8 @@ import javax.jms.TextMessage; public class CommitRollbackTest extends TestCase { protected AMQConnection conn; - protected final String queue = "direct://amq.direct//Qpid.Client.Transacted.CommitRollback.queue"; + protected String queue = "direct://amq.direct//Qpid.Client.Transacted.CommitRollback.queue"; + protected static int testMethod = 0; protected String payload = "xyzzy"; private Session _session; private MessageProducer _publisher; @@ -57,6 +58,11 @@ public class CommitRollbackTest extends TestCase { super.setUp(); TransportConnection.createVMBroker(1); + + testMethod++; + queue += testMethod; + + newConnection(); } @@ -84,7 +90,11 @@ public class CommitRollbackTest extends TestCase TransportConnection.killVMBroker(1); } - /** PUT a text message, disconnect before commit, confirm it is gone. */ + /** + * PUT a text message, disconnect before commit, confirm it is gone. + * + * @throws Exception On error + */ public void testPutThenDisconnect() throws Exception { assertTrue("session is not transacted", _session.getTransacted()); @@ -109,7 +119,11 @@ public class CommitRollbackTest extends TestCase assertNull("test message was put and disconnected before commit, but is still present", result); } - /** PUT a text message, disconnect before commit, confirm it is gone. */ + /** + * PUT a text message, disconnect before commit, confirm it is gone. + * + * @throws Exception On error + */ public void testPutThenCloseDisconnect() throws Exception { assertTrue("session is not transacted", _session.getTransacted()); @@ -140,6 +154,8 @@ public class CommitRollbackTest extends TestCase /** * PUT a text message, rollback, confirm message is gone. The consumer is on the same connection but different * session as producer + * + * @throws Exception On error */ public void testPutThenRollback() throws Exception { @@ -160,7 +176,11 @@ public class CommitRollbackTest extends TestCase assertNull("test message was put and rolled back, but is still present", result); } - /** GET a text message, disconnect before commit, confirm it is still there. The consumer is on a new connection */ + /** + * GET a text message, disconnect before commit, confirm it is still there. The consumer is on a new connection + * + * @throws Exception On error + */ public void testGetThenDisconnect() throws Exception { assertTrue("session is not transacted", _session.getTransacted()); @@ -194,6 +214,8 @@ public class CommitRollbackTest extends TestCase /** * GET a text message, close consumer, disconnect before commit, confirm it is still there. The consumer is on the * same connection but different session as producer + * + * @throws Exception On error */ public void testGetThenCloseDisconnect() throws Exception { @@ -230,6 +252,8 @@ public class CommitRollbackTest extends TestCase /** * GET a text message, rollback, confirm it is still there. The consumer is on the same connection but differnt * session to the producer + * + * @throws Exception On error */ public void testGetThenRollback() throws Exception { @@ -266,6 +290,8 @@ public class CommitRollbackTest extends TestCase /** * GET a text message, close message producer, rollback, confirm it is still there. The consumer is on the same * connection but different session as producer + * + * @throws Exception On error */ public void testGetThenCloseRollback() throws Exception { @@ -304,7 +330,11 @@ public class CommitRollbackTest extends TestCase } - /** Test that rolling back a session purges the dispatcher queue, and the messages arrive in the correct order */ + /** + * Test that rolling back a session purges the dispatcher queue, and the messages arrive in the correct order + * + * @throws Exception On error + */ public void testSend2ThenRollback() throws Exception { assertTrue("session is not transacted", _session.getTransacted()); @@ -339,37 +369,41 @@ public class CommitRollbackTest extends TestCase public void testSend2ThenCloseAfter1andTryAgain() throws Exception { -// assertTrue("session is not transacted", _session.getTransacted()); -// assertTrue("session is not transacted", _pubSession.getTransacted()); -// -// _logger.info("sending two test messages"); -// _publisher.send(_pubSession.createTextMessage("1")); -// _publisher.send(_pubSession.createTextMessage("2")); -// _pubSession.commit(); -// -// _logger.info("getting test message"); -// assertEquals("1", ((TextMessage) _consumer.receive(1000)).getText()); -// -// _consumer.close(); -// -// _consumer = _session.createConsumer(_jmsQueue); -// -// _logger.info("receiving result"); -// Message result = _consumer.receive(1000); -// _logger.error("1:" + result); -//// assertNotNull("test message was consumed and rolled back, but is gone", result); -//// assertEquals("1" , ((TextMessage) result).getText()); -//// assertTrue("Messasge is not marked as redelivered" + result, result.getJMSRedelivered()); -// -// result = _consumer.receive(1000); -// _logger.error("2" + result); -//// assertNotNull("test message was consumed and rolled back, but is gone", result); -//// assertEquals("2", ((TextMessage) result).getText()); -//// assertTrue("Messasge is marked as redelivered" + result, !result.getJMSRedelivered()); -// -// result = _consumer.receive(1000); -// _logger.error("3" + result); -// assertNull("test message should be null:" + result, result); + assertTrue("session is not transacted", _session.getTransacted()); + assertTrue("session is not transacted", _pubSession.getTransacted()); + + _logger.info("sending two test messages"); + _publisher.send(_pubSession.createTextMessage("1")); + _publisher.send(_pubSession.createTextMessage("2")); + _pubSession.commit(); + + _logger.info("getting test message"); + Message result = _consumer.receive(1000); + + assertNotNull("Message received should not be null", result); + assertEquals("1", ((TextMessage) result).getText()); + assertTrue("Messasge is marked as redelivered" + result, !result.getJMSRedelivered()); + + + _logger.info("Closing Consumer"); + _consumer.close(); + + _logger.info("Creating New consumer"); + _consumer = _session.createConsumer(_jmsQueue); + + _logger.info("receiving result"); + result = _consumer.receive(1000); + assertNotNull("test message was consumed and rolled back, but is gone", result); + assertEquals("1", ((TextMessage) result).getText()); + assertTrue("Messasge is not marked as redelivered" + result, result.getJMSRedelivered()); + + result = _consumer.receive(1000); + assertNotNull("test message was consumed and rolled back, but is gone", result); + assertEquals("2", ((TextMessage) result).getText()); + assertTrue("Messasge is not marked as redelivered" + result, result.getJMSRedelivered()); + + result = _consumer.receive(1000); + assertNull("test message should be null:" + result, result); } } diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java index 94cbb426e5..d994d4c141 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java @@ -62,69 +62,125 @@ public class TransactedTest extends TestCase { super.setUp(); TransportConnection.createVMBroker(1); + _logger.info("Create Connection"); con = new AMQConnection("vm://:1", "guest", "guest", "TransactedTest", "test"); + + _logger.info("Create Session"); session = con.createSession(true, Session.SESSION_TRANSACTED); + _logger.info("Create Q1"); queue1 = new AMQQueue(session.getDefaultQueueExchangeName(), new AMQShortString("Q1"), new AMQShortString("Q1"), false, true); + _logger.info("Create Q2"); queue2 = new AMQQueue(session.getDefaultQueueExchangeName(), new AMQShortString("Q2"), false); - + _logger.info("Create Consumer of Q1"); consumer1 = session.createConsumer(queue1); - //Dummy just to create the queue. + //Dummy just to create the queue. + _logger.info("Create Consumer of Q2"); MessageConsumer consumer2 = session.createConsumer(queue2); + _logger.info("Close Consumer of Q2"); consumer2.close(); + + _logger.info("Create producer to Q2"); producer2 = session.createProducer(queue2); + + _logger.info("Start Connection"); con.start(); + _logger.info("Create prep connection"); prepCon = new AMQConnection("vm://:1", "guest", "guest", "PrepConnection", "test"); + + _logger.info("Create prep session"); prepSession = prepCon.createSession(false, AMQSession.NO_ACKNOWLEDGE); + + _logger.info("Create prep producer to Q1"); prepProducer1 = prepSession.createProducer(queue1); + + _logger.info("Create prep connection start"); prepCon.start(); - //add some messages - prepProducer1.send(prepSession.createTextMessage("A")); - prepProducer1.send(prepSession.createTextMessage("B")); - prepProducer1.send(prepSession.createTextMessage("C")); + _logger.info("Create test connection"); testCon = new AMQConnection("vm://:1", "guest", "guest", "TestConnection", "test"); + _logger.info("Create test session"); testSession = testCon.createSession(false, AMQSession.NO_ACKNOWLEDGE); + _logger.info("Create test consumer of q2"); testConsumer2 = testSession.createConsumer(queue2); - } protected void tearDown() throws Exception { + _logger.info("Close connection"); con.close(); + _logger.info("Close test connection"); testCon.close(); + _logger.info("Close prep connection"); prepCon.close(); + _logger.info("Kill broker"); TransportConnection.killAllVMBrokers(); super.tearDown(); } public void testCommit() throws Exception { + //add some messages + _logger.info("Send prep A"); + prepProducer1.send(prepSession.createTextMessage("A")); + _logger.info("Send prep B"); + prepProducer1.send(prepSession.createTextMessage("B")); + _logger.info("Send prep C"); + prepProducer1.send(prepSession.createTextMessage("C")); + //send and receive some messages + _logger.info("Send X to Q2"); producer2.send(session.createTextMessage("X")); + _logger.info("Send Y to Q2"); producer2.send(session.createTextMessage("Y")); + _logger.info("Send Z to Q2"); producer2.send(session.createTextMessage("Z")); + + + _logger.info("Read A from Q1"); expect("A", consumer1.receive(1000)); + _logger.info("Read B from Q1"); expect("B", consumer1.receive(1000)); + _logger.info("Read C from Q1"); expect("C", consumer1.receive(1000)); //commit + _logger.info("session commit"); session.commit(); + _logger.info("Start test Connection"); testCon.start(); + //ensure sent messages can be received and received messages are gone + _logger.info("Read X from Q2"); expect("X", testConsumer2.receive(1000)); + _logger.info("Read Y from Q2"); expect("Y", testConsumer2.receive(1000)); + _logger.info("Read Z from Q2"); expect("Z", testConsumer2.receive(1000)); + _logger.info("create test session on Q1"); testConsumer1 = testSession.createConsumer(queue1); + _logger.info("Read null from Q1"); assertTrue(null == testConsumer1.receive(1000)); + _logger.info("Read null from Q2"); assertTrue(null == testConsumer2.receive(1000)); } public void testRollback() throws Exception { + //add some messages + _logger.info("Send prep A"); + prepProducer1.send(prepSession.createTextMessage("A")); + _logger.info("Send prep B"); + prepProducer1.send(prepSession.createTextMessage("B")); + _logger.info("Send prep C"); + prepProducer1.send(prepSession.createTextMessage("C")); + + //Quick sleep to ensure all three get pre-fetched + Thread.sleep(500); + _logger.info("Sending X Y Z"); producer2.send(session.createTextMessage("X")); producer2.send(session.createTextMessage("Y")); @@ -140,9 +196,9 @@ public class TransactedTest extends TestCase _logger.info("Receiving A B C"); //ensure sent messages are not visible and received messages are requeued - expect("A", consumer1.receive(1000)); - expect("B", consumer1.receive(1000)); - expect("C", consumer1.receive(1000)); + expect("A", consumer1.receive(1000), true); + expect("B", consumer1.receive(1000), true); + expect("C", consumer1.receive(1000), true); _logger.info("Starting new connection"); testCon.start(); @@ -152,20 +208,22 @@ public class TransactedTest extends TestCase assertTrue(null == testConsumer2.receive(1000)); session.commit(); + + _logger.info("Testing we have no messages left after commit"); + assertTrue(null == testConsumer1.receive(1000)); + assertTrue(null == testConsumer2.receive(1000)); } public void testResendsMsgsAfterSessionClose() throws Exception { AMQConnection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "test"); - Session consumerSession = con.createSession(true, Session.CLIENT_ACKNOWLEDGE); + Session consumerSession = con.createSession(true, Session.SESSION_TRANSACTED); AMQQueue queue3 = new AMQQueue(consumerSession.getDefaultQueueExchangeName(), new AMQShortString("Q3"), false); MessageConsumer consumer = consumerSession.createConsumer(queue3); - //force synch to ensure the consumer has resulted in a bound queue - ((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.DIRECT_EXCHANGE_NAME, ExchangeDefaults.DIRECT_EXCHANGE_CLASS); AMQConnection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "test"); - Session producerSession = con2.createSession(true, Session.CLIENT_ACKNOWLEDGE); + Session producerSession = con2.createSession(true, Session.SESSION_TRANSACTED); MessageProducer producer = producerSession.createProducer(queue3); _logger.info("Sending four messages"); @@ -176,65 +234,77 @@ public class TransactedTest extends TestCase producerSession.commit(); - _logger.info("Starting connection"); con.start(); TextMessage tm = (TextMessage) consumer.receive(); + assertNotNull(tm); + assertEquals("msg1", tm.getText()); - tm.acknowledge(); consumerSession.commit(); - _logger.info("Received and acknowledged first message"); + _logger.info("Received and committed first message"); tm = (TextMessage) consumer.receive(1000); assertNotNull(tm); + assertEquals("msg2", tm.getText()); + tm = (TextMessage) consumer.receive(1000); assertNotNull(tm); + assertEquals("msg3", tm.getText()); + tm = (TextMessage) consumer.receive(1000); assertNotNull(tm); + assertEquals("msg4", tm.getText()); + _logger.info("Received all four messages. Closing connection with three outstanding messages"); consumerSession.close(); - consumerSession = con.createSession(true, Session.CLIENT_ACKNOWLEDGE); + consumerSession = con.createSession(true, Session.SESSION_TRANSACTED); consumer = consumerSession.createConsumer(queue3); // no ack for last three messages so when I call recover I expect to get three messages back - tm = (TextMessage) consumer.receive(3000); assertNotNull(tm); assertEquals("msg2", tm.getText()); + assertTrue("Message is not redelivered", tm.getJMSRedelivered()); tm = (TextMessage) consumer.receive(3000); assertNotNull(tm); assertEquals("msg3", tm.getText()); + assertTrue("Message is not redelivered", tm.getJMSRedelivered()); tm = (TextMessage) consumer.receive(3000); assertNotNull(tm); assertEquals("msg4", tm.getText()); + assertTrue("Message is not redelivered", tm.getJMSRedelivered()); + + _logger.info("Received redelivery of three messages. Committing"); - _logger.info("Received redelivery of three messages. Acknowledging last message"); - tm.acknowledge(); consumerSession.commit(); - _logger.info("Calling acknowledge with no outstanding messages"); - // all acked so no messages to be delivered + _logger.info("Called commit"); - tm = (TextMessage) consumer.receiveNoWait(); + tm = (TextMessage) consumer.receive(1000); assertNull(tm); + _logger.info("No messages redelivered as is expected"); con.close(); con2.close(); - } - private void expect(String text, Message msg) throws JMSException { + expect(text, msg, false); + } + + private void expect(String text, Message msg, boolean requeued) throws JMSException + { assertNotNull("Message should not be null", msg); assertTrue("Message should be a text message", msg instanceof TextMessage); assertEquals("Message content does not match expected", text, ((TextMessage) msg).getText()); + assertEquals("Message should " + (requeued ? "" : "not") + " be requeued", requeued, msg.getJMSRedelivered()); } public static junit.framework.Test suite() diff --git a/java/cluster/src/test/java/org/apache/qpid/server/cluster/SimpleClusterTest.java b/java/cluster/src/test/java/org/apache/qpid/server/cluster/SimpleClusterTest.java index 70209cd2a3..86cde3cee7 100644 --- a/java/cluster/src/test/java/org/apache/qpid/server/cluster/SimpleClusterTest.java +++ b/java/cluster/src/test/java/org/apache/qpid/server/cluster/SimpleClusterTest.java @@ -37,7 +37,7 @@ public class SimpleClusterTest extends TestCase AMQConnection con = new AMQConnection("localhost:9000", "guest", "guest", "test", "/test"); AMQSession session = (AMQSession) con.createSession(false, AMQSession.NO_ACKNOWLEDGE); System.out.println("Session created"); - session.declareExchange(new AMQShortString("my_exchange"), new AMQShortString("direct")); + session.declareExchange(new AMQShortString("my_exchange"), new AMQShortString("direct"), true); System.out.println("Exchange declared"); con.close(); System.out.println("Connection closed"); diff --git a/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java b/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java index ed244396bf..379f7feb4f 100644 --- a/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java +++ b/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java @@ -100,7 +100,7 @@ public final class AMQConstant public static final AMQConstant RESOURCE_ERROR = new AMQConstant(506, "resource error", true); - public static final AMQConstant NOT_ALLOWED = new AMQConstant(507, "not allowed", true); + public static final AMQConstant NOT_ALLOWED = new AMQConstant(530, "not allowed", true); public static final AMQConstant NOT_IMPLEMENTED = new AMQConstant(540, "not implemented", true); diff --git a/java/common/src/main/java/org/apache/qpid/util/ConcurrentLinkedMessageQueueAtomicSize.java b/java/common/src/main/java/org/apache/qpid/util/ConcurrentLinkedMessageQueueAtomicSize.java index cdf686b4cb..883d5018cd 100644 --- a/java/common/src/main/java/org/apache/qpid/util/ConcurrentLinkedMessageQueueAtomicSize.java +++ b/java/common/src/main/java/org/apache/qpid/util/ConcurrentLinkedMessageQueueAtomicSize.java @@ -41,6 +41,11 @@ public class ConcurrentLinkedMessageQueueAtomicSize<E> extends ConcurrentLinkedQ return super.size() + _messageHeadSize.get(); } + public int headSize() + { + return _messageHeadSize.get(); + } + @Override public E poll() { @@ -50,10 +55,14 @@ public class ConcurrentLinkedMessageQueueAtomicSize<E> extends ConcurrentLinkedQ } else { - _logger.debug("Providing item from message head"); - E e = _messageHead.poll(); + if (_logger.isDebugEnabled()) + { + _logger.debug("Providing item(" + e + ")from message head"); + } + + if (e != null) { _messageHeadSize.decrementAndGet(); @@ -159,8 +168,12 @@ public class ConcurrentLinkedMessageQueueAtomicSize<E> extends ConcurrentLinkedQ } else { - _logger.debug("Providing item from message head"); - return _messageHead.peek(); + E o = _messageHead.peek(); + if (_logger.isDebugEnabled()) + { + _logger.debug("Peeking item (" + o + ") from message head"); + } + return o; } } @@ -186,7 +199,10 @@ public class ConcurrentLinkedMessageQueueAtomicSize<E> extends ConcurrentLinkedQ public boolean pushHead(E o) { - _logger.debug("Adding item to head of queue"); + if (_logger.isDebugEnabled()) + { + _logger.debug("Adding item(" + o + ") to head of queue"); + } if (_messageHead.offer(o)) { _messageHeadSize.incrementAndGet(); diff --git a/java/systests/src/main/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java b/java/systests/src/main/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java index c8271f1549..87491ed3d3 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java +++ b/java/systests/src/main/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java @@ -67,7 +67,8 @@ public class ReturnUnroutableMandatoryMessageTest extends TestCase implements Ex MessageConsumer consumer = consumerSession.createConsumer(queue, AMQSession.DEFAULT_PREFETCH_LOW_MARK, AMQSession.DEFAULT_PREFETCH_HIGH_MARK, false, false, (String) null, ft);
//force synch to ensure the consumer has resulted in a bound queue
- ((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.HEADERS_EXCHANGE_NAME, ExchangeDefaults.HEADERS_EXCHANGE_CLASS);
+ //((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.HEADERS_EXCHANGE_NAME, ExchangeDefaults.HEADERS_EXCHANGE_CLASS);
+ // This is the default now
Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "/test");
|
