diff options
Diffstat (limited to 'java')
31 files changed, 1191 insertions, 515 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index 5dd6619cff..1ebe5fa0a2 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -319,6 +319,25 @@ public class AMQChannel public void unsubscribeConsumer(AMQProtocolSession session, AMQShortString consumerTag) throws AMQException { + if (_log.isDebugEnabled()) + { + _log.debug("Unacked Map Dump size:" + _unacknowledgedMessageMap.size()); + _unacknowledgedMessageMap.visit(new UnacknowledgedMessageMap.Visitor() + { + + public boolean callback(UnacknowledgedMessage message) throws AMQException + { + _log.debug(message); + + return true; + } + + public void visitComplete() + { + } + }); + } + AMQQueue q = _consumerTag2QueueMap.remove(consumerTag); if (q != null) { @@ -342,9 +361,23 @@ public class AMQChannel private void unsubscribeAllConsumers(AMQProtocolSession session) throws AMQException { - _log.info("Unsubscribing all consumers on channel " + toString()); + if (_log.isInfoEnabled()) + { + if (!_consumerTag2QueueMap.isEmpty()) + { + _log.info("Unsubscribing all consumers on channel " + toString()); + } + else + { + _log.info("No consumers to unsubscribe on channel " + toString()); + } + } for (Map.Entry<AMQShortString, AMQQueue> me : _consumerTag2QueueMap.entrySet()) { + if (_log.isInfoEnabled()) + { + _log.info("Unsubscribing consumer '" + me.getKey() + "' on channel " + toString()); + } me.getValue().unregisterProtocolSession(session, _channelId, me.getKey()); } _consumerTag2QueueMap.clear(); @@ -369,7 +402,11 @@ public class AMQChannel } else { - _log.debug(debugIdentity() + " Adding unacked message(" + deliveryTag + ") with a queue(" + queue + "):" + message.debugIdentity()); + if (_log.isDebugEnabled()) + { + _log.debug(debugIdentity() + " Adding unacked message(" + message.toString() + " DT:" + deliveryTag + + ") with a queue(" + queue + ") for " + consumerTag); + } } } @@ -395,25 +432,38 @@ public class AMQChannel */ public void requeue() throws AMQException { + if (_log.isInfoEnabled()) + { + _log.info("Requeuing for " + toString()); + } + // we must create a new map since all the messages will get a new delivery tag when they are redelivered Collection<UnacknowledgedMessage> messagesToBeDelivered = _unacknowledgedMessageMap.cancelAllMessages(); + if (_log.isDebugEnabled()) + { + _log.info("Requeuing " + messagesToBeDelivered.size() + " unacked messages."); + } // Deliver these messages out of the transaction as their delivery was never // part of the transaction only the receive. - TransactionalContext deliveryContext; - if (!(_txnContext instanceof NonTransactionalContext)) + TransactionalContext deliveryContext = null; + + if (!messagesToBeDelivered.isEmpty()) { - if (_nonTransactedContext == null) + if (!(_txnContext instanceof NonTransactionalContext)) { - _nonTransactedContext = new NonTransactionalContext(_messageStore, _storeContext, this, - _returnMessages, _browsedAcks); - } +// if (_nonTransactedContext == null) + { + _nonTransactedContext = new NonTransactionalContext(_messageStore, _storeContext, this, + _returnMessages, _browsedAcks); + } - deliveryContext = _nonTransactedContext; - } - else - { - deliveryContext = _txnContext; + deliveryContext = _nonTransactedContext; + } + else + { + deliveryContext = _txnContext; + } } @@ -421,6 +471,10 @@ public class AMQChannel { if (unacked.queue != null) { + // Ensure message is released for redelivery + unacked.message.release(); + + // Mark message redelivered unacked.message.setRedelivered(true); // Deliver Message @@ -459,7 +513,7 @@ public class AMQChannel TransactionalContext deliveryContext; if (!(_txnContext instanceof NonTransactionalContext)) { - if (_nonTransactedContext == null) +// if (_nonTransactedContext == null) { _nonTransactedContext = new NonTransactionalContext(_messageStore, _storeContext, this, _returnMessages, _browsedAcks); @@ -472,13 +526,12 @@ public class AMQChannel deliveryContext = _txnContext; } - if (unacked.queue != null) { //Redeliver the messages to the front of the queue deliveryContext.deliver(unacked.message, unacked.queue, true); - - unacked.message.decrementReference(_storeContext); + //Deliver increments the message count but we have already deliverted this once so don't increment it again + // this was because deliver did an increment changed this. } else { @@ -489,7 +542,6 @@ public class AMQChannel // // deliveryContext.deliver(unacked.message, getDeadLetterQueue(), false); // -// unacked.message.decrementReference(_storeContext); } } else @@ -656,15 +708,16 @@ public class AMQChannel } sub.addToResendQueue(msg); _unacknowledgedMessageMap.remove(message.deliveryTag); - // Don't decrement as we are bypassing the normal deliver which increments - // this is why there is a decrement on the Requeue as deliver will increment. - // msg.decrementReference(_storeContext); } } // sync(sub.getSendLock) } else { - _log.info("DeliveredSubscription not recorded so just requeueing to prevent loss"); + + if (_log.isInfoEnabled()) + { + _log.info("DeliveredSubscription not recorded so just requeueing(" + message.toString() + ")to prevent loss"); + } //move this message to requeue msgToRequeue.add(message); } @@ -706,7 +759,6 @@ public class AMQChannel deliveryContext.deliver(message.message, message.queue, true); _unacknowledgedMessageMap.remove(message.deliveryTag); - message.message.decrementReference(_storeContext); } } @@ -760,8 +812,18 @@ public class AMQChannel { synchronized (_unacknowledgedMessageMap.getLock()) { + if (_log.isDebugEnabled()) + { + _log.debug("Unacked (PreAck) Size:" + _unacknowledgedMessageMap.size()); + } + _unacknowledgedMessageMap.acknowledgeMessage(deliveryTag, multiple, _txnContext); checkSuspension(); + if (_log.isDebugEnabled()) + { + _log.debug("Unacked (PostAck) Size:" + _unacknowledgedMessageMap.size()); + } + } } @@ -775,12 +837,6 @@ public class AMQChannel return _unacknowledgedMessageMap; } - public void addUnacknowledgedBrowsedMessage(AMQMessage msg, long deliveryTag, AMQShortString consumerTag, AMQQueue queue) - { - _browsedAcks.add(deliveryTag); - addUnacknowledgedMessage(msg, deliveryTag, consumerTag, queue); - } - private void checkSuspension() { boolean suspend; diff --git a/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java b/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java index 820f0122f5..fb16267d97 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java +++ b/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java @@ -37,6 +37,10 @@ public abstract class RequiredDeliveryException extends AMQException { super(message); _amqMessage = payload; + // Increment the reference as this message is in the routing phase + // and so will have the ref decremented as routing fails. + // we need to keep this message around so we can return it in the + // handler. So increment here. payload.incrementReference(); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java b/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java index c987c12154..aac9408247 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java +++ b/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java @@ -101,6 +101,8 @@ public class TxAck implements TxnOp for (UnacknowledgedMessage msg : _unacked) { msg.restoreTransientMessageData(); + + //Message has been ack so discard it. This will dequeue and decrement the reference. msg.discard(storeContext); } } @@ -124,7 +126,7 @@ public class TxAck implements TxnOp _map.remove(_unacked); for (UnacknowledgedMessage msg : _unacked) { - msg.clearTransientMessageData(); + msg.clearTransientMessageData(); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java b/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java index 940b5b2bf1..b8c5e821f7 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java +++ b/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java @@ -39,7 +39,6 @@ public class UnacknowledgedMessage this.message = message; this.consumerTag = consumerTag; this.deliveryTag = deliveryTag; - message.incrementReference(); } public String toString() @@ -63,6 +62,7 @@ public class UnacknowledgedMessage { message.dequeue(storeContext, queue); } + //if the queue is null then the message is waiting to be acked, but has been removed. message.decrementReference(storeContext); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicCancelMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicCancelMethodHandler.java index 7d18043f5c..8bab96a11b 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicCancelMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicCancelMethodHandler.java @@ -29,9 +29,12 @@ import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.state.StateAwareMethodListener; +import org.apache.log4j.Logger; public class BasicCancelMethodHandler implements StateAwareMethodListener<BasicCancelBody> { + private static final Logger _log = Logger.getLogger(BasicCancelMethodHandler.class); + private static final BasicCancelMethodHandler _instance = new BasicCancelMethodHandler(); public static BasicCancelMethodHandler getInstance() @@ -55,6 +58,12 @@ public class BasicCancelMethodHandler implements StateAwareMethodListener<BasicC throw body.getChannelNotFoundException(evt.getChannelId()); } + if (_log.isDebugEnabled()) + { + _log.debug("BasicCancel: for:" + body.consumerTag + + " nowait:" + body.nowait); + } + channel.unsubscribeConsumer(protocolSession, body.consumerTag); if (!body.nowait) { diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java index da61f2ffd5..56eae279dc 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java @@ -25,6 +25,8 @@ import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.BasicConsumeBody; import org.apache.qpid.framing.BasicConsumeOkBody; +import org.apache.qpid.framing.ChannelCloseBody; +import org.apache.qpid.framing.ConnectionCloseBody; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.server.AMQChannel; @@ -67,12 +69,22 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic } else { + if (_log.isDebugEnabled()) + { + _log.debug("BasicConsume: from '" + body.queue + + "' for:" + body.consumerTag + + " nowait:" + body.nowait + + " args:" + body.arguments); + } AMQQueue queue = body.queue == null ? channel.getDefaultQueue() : vHost.getQueueRegistry().getQueue(body.queue); if (queue == null) { - _log.info("No queue for '" + body.queue + "'"); + if (_log.isTraceEnabled()) + { + _log.trace("No queue for '" + body.queue + "'"); + } if (body.queue != null) { String msg = "No such queue, '" + body.queue + "'"; @@ -105,14 +117,34 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic } catch (org.apache.qpid.AMQInvalidArgumentException ise) { - _log.info("Closing connection due to invalid selector"); - throw body.getChannelException(AMQConstant.INVALID_ARGUMENT, ise.getMessage()); + _log.debug("Closing connection due to invalid selector"); + // Why doesn't this ChannelException work. +// throw body.getChannelException(AMQConstant.INVALID_ARGUMENT, ise.getMessage()); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + // Be aware of possible changes to parameter order as versions change. + session.writeFrame(ChannelCloseBody.createAMQFrame(channelId, + (byte) 8, (byte) 0, // AMQP version (major, minor) + BasicConsumeBody.getClazz((byte) 8, (byte) 0), // classId + BasicConsumeBody.getMethod((byte) 8, (byte) 0), // methodId + AMQConstant.INVALID_ARGUMENT.getCode(), // replyCode + new AMQShortString(ise.getMessage()))); // replyText } catch (ConsumerTagNotUniqueException e) { AMQShortString msg = new AMQShortString("Non-unique consumer tag, '" + body.consumerTag + "'"); - throw body.getConnectionException(AMQConstant.NOT_ALLOWED, - "Non-unique consumer tag, '" + body.consumerTag + "'"); + // If the above doesn't work then perhaps this is wrong too. +// throw body.getConnectionException(AMQConstant.NOT_ALLOWED, +// "Non-unique consumer tag, '" + body.consumerTag + "'"); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) + // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. + // Be aware of possible changes to parameter order as versions change. + session.writeFrame(ConnectionCloseBody.createAMQFrame(channelId, + (byte)8, (byte)0, // AMQP version (major, minor) + BasicConsumeBody.getClazz((byte)8, (byte)0), // classId + BasicConsumeBody.getMethod((byte)8, (byte)0), // methodId + AMQConstant.NOT_ALLOWED.getCode(), // replyCode + msg)); // replyText } catch (AMQQueue.ExistingExclusiveSubscription e) { diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java index 4e77a5e8b9..14687c40ae 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java @@ -52,13 +52,13 @@ public class BasicRejectMethodHandler implements StateAwareMethodListener<BasicR int channelId = evt.getChannelId(); - if (_logger.isTraceEnabled()) - { - _logger.trace("Rejecting:" + evt.getMethod().deliveryTag + - ": Requeue:" + evt.getMethod().requeue + -// ": Resend:" + evt.getMethod().resend + - " on channel:" + channelId); - } +// if (_logger.isDebugEnabled()) +// { +// _logger.debug("Rejecting:" + evt.getMethod().deliveryTag + +// ": Requeue:" + evt.getMethod().requeue + +//// ": Resend:" + evt.getMethod().resend + +// " on channel:" + channelId); +// } AMQChannel channel = session.getChannel(channelId); @@ -67,9 +67,9 @@ public class BasicRejectMethodHandler implements StateAwareMethodListener<BasicR throw evt.getMethod().getChannelNotFoundException(channelId); } - if (_logger.isTraceEnabled()) + if (_logger.isDebugEnabled()) { - _logger.trace("Rejecting:" + evt.getMethod().deliveryTag + + _logger.debug("Rejecting:" + evt.getMethod().deliveryTag + ": Requeue:" + evt.getMethod().requeue + // ": Resend:" + evt.getMethod().resend + " on channel:" + channel.debugIdentity()); diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java index 777784ca30..1f4f1f9221 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java @@ -51,8 +51,11 @@ public class ChannelCloseHandler implements StateAwareMethodListener<ChannelClos { AMQProtocolSession session = stateManager.getProtocolSession(); ChannelCloseBody body = evt.getMethod(); - _logger.info("Received channel close for id " + evt.getChannelId() + " citing class " + body.classId + - " and method " + body.methodId); + if (_logger.isInfoEnabled()) + { + _logger.info("Received channel close for id " + evt.getChannelId() + " citing class " + body.classId + + " and method " + body.methodId); + } int channelId = evt.getChannelId(); AMQChannel channel = session.getChannel(channelId); diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java index 21da03d226..b086cad67f 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java @@ -30,7 +30,7 @@ import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.state.StateAwareMethodListener; -public class ConnectionCloseMethodHandler implements StateAwareMethodListener<ConnectionCloseBody> +public class ConnectionCloseMethodHandler implements StateAwareMethodListener<ConnectionCloseBody> { private static final Logger _logger = Logger.getLogger(ConnectionCloseMethodHandler.class); @@ -49,8 +49,11 @@ public class ConnectionCloseMethodHandler implements StateAwareMethodListener<C { AMQProtocolSession session = stateManager.getProtocolSession(); final ConnectionCloseBody body = evt.getMethod(); - _logger.info("ConnectionClose received with reply code/reply text " + body.replyCode + "/" + - body.replyText + " for " + session); + if (_logger.isInfoEnabled()) + { + _logger.info("ConnectionClose received with reply code/reply text " + body.replyCode + "/" + + body.replyText + " for " + session); + } try { session.closeSession(); @@ -62,7 +65,7 @@ public class ConnectionCloseMethodHandler implements StateAwareMethodListener<C // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. - final AMQFrame response = ConnectionCloseOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0); + final AMQFrame response = ConnectionCloseOkBody.createAMQFrame(evt.getChannelId(), (byte) 8, (byte) 0); session.writeFrame(response); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java index 03c7051aac..d8b7814d31 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java @@ -176,6 +176,8 @@ public class AMQPFastProtocolHandler extends IoHandlerAdapter } else { + _logger.error("Exception caught in" + session + ", closing session explictly: " + throwable, throwable); + // Be aware of possible changes to parameter order as versions change. protocolSession.write(ConnectionCloseBody.createAMQFrame(0, session.getProtocolMajorVersion(), @@ -185,7 +187,6 @@ public class AMQPFastProtocolHandler extends IoHandlerAdapter 200, // replyCode new AMQShortString(throwable.getMessage()) // replyText )); - _logger.error("Exception caught in" + session + ", closing session explictly: " + throwable, throwable); protocolSession.close(); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java index 6d375c89fe..cdf316f2d7 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java @@ -45,9 +45,7 @@ import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -/** - * Combines the information that make up a deliverable message into a more manageable form. - */ +/** Combines the information that make up a deliverable message into a more manageable form. */ public class AMQMessage { private static final Logger _log = Logger.getLogger(AMQMessage.class); @@ -92,9 +90,10 @@ public class AMQMessage return _taken.get(); } + private final int hashcode = System.identityHashCode(this); public String debugIdentity() { - return "(HC:" + System.identityHashCode(this) + " ID:" + _messageId + ")"; + return "(HC:" + hashcode + " ID:" + _messageId + " Ref:" + _referenceCount.get() + ")"; } /** @@ -206,7 +205,7 @@ public class AMQMessage _taken = new AtomicBoolean(false); if (_log.isDebugEnabled()) { - _log.debug("Message(" + System.identityHashCode(this) + ") created with id " + messageId); + _log.debug("Message(" + System.identityHashCode(this) + ") created (" + debugIdentity()+")"); } } @@ -363,7 +362,7 @@ public class AMQMessage if (_log.isDebugEnabled()) { - _log.debug("Ref count on message " + _messageId + " incremented to " + _referenceCount + " " + Arrays.asList(Thread.currentThread().getStackTrace()).subList(0, 4)); + _log.debug("Ref count on message " + debugIdentity() + " incremented " + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 6)); } } @@ -374,6 +373,7 @@ public class AMQMessage * * @throws MessageCleanupException when an attempt was made to remove the message from the message store and that * failed + * @param storeContext */ public void decrementReference(StoreContext storeContext) throws MessageCleanupException { @@ -387,9 +387,7 @@ public class AMQMessage { if (_log.isDebugEnabled()) { - _log.debug("Ref count on message " + _messageId + " is zero; removing message" + Arrays.asList(Thread.currentThread().getStackTrace()).subList(0, 4)); - - + _log.debug("Decremented ref count on message " + debugIdentity() + " is zero; removing message" + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 6)); } // must check if the handle is null since there may be cases where we decide to throw away a message @@ -410,7 +408,7 @@ public class AMQMessage { if (_log.isDebugEnabled()) { - _log.debug("Ref count is now " + _referenceCount + " for message id " + _messageId + "\n" + Arrays.asList(Thread.currentThread().getStackTrace()).subList(0, 4)); + _log.debug("Decremented ref count is now " + _referenceCount + " for message id " + debugIdentity() + "\n" + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 5)); if (_referenceCount.get() < 0) { Thread.dumpStack(); @@ -418,7 +416,7 @@ public class AMQMessage } if (_referenceCount.get() < 0) { - throw new MessageCleanupException("Reference count for message id " + _messageId + " has gone below 0."); + throw new MessageCleanupException("Reference count for message id " + debugIdentity() + " has gone below 0."); } } } @@ -459,7 +457,10 @@ public class AMQMessage public void release() { - _log.trace("Releasing Message:" + debugIdentity()); + if (_log.isTraceEnabled()) + { + _log.trace("Releasing Message:" + debugIdentity()); + } _taken.set(false); _takenBySubcription = null; } @@ -572,7 +573,7 @@ public class AMQMessage List<AMQQueue> destinationQueues = _transientMessageData.getDestinationQueues(); if (_log.isDebugEnabled()) { - _log.debug("Delivering message " + _messageId + " to " + destinationQueues); + _log.debug("Delivering message " + debugIdentity() + " to " + destinationQueues); } try { @@ -589,6 +590,8 @@ public class AMQMessage for (AMQQueue q : destinationQueues) { + //Increment the references to this message for each queue delivery. + incrementReference(); //normal deliver so add this message at the end. _txnContext.deliver(this, q, false); } @@ -596,6 +599,7 @@ public class AMQMessage finally { destinationQueues.clear(); + // Remove refence for routing process . Reference count should now == delivered queue count decrementReference(storeContext); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java index 7c2fe73386..78f144703b 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java @@ -516,7 +516,7 @@ public class AMQQueue implements Managable, Comparable { if (_logger.isInfoEnabled()) { - _logger.warn("Auto-deleteing queue:" + this); + _logger.info("Auto-deleteing queue:" + this); } autodelete(); // we need to manually fire the event to the removed subscription (which was the last one left for this @@ -624,7 +624,6 @@ public class AMQQueue implements Managable, Comparable try { msg.dequeue(storeContext, this); - msg.decrementReference(storeContext); } catch (MessageCleanupException e) { diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java index 87868f0b25..6122d191f8 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java @@ -383,6 +383,9 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager return count; } + /** + This can only be used to clear the _messages queue. Any subscriber resend queue will not be purged. + */ private AMQMessage getNextMessage() throws AMQException { return getNextMessage(_messages, null); @@ -392,13 +395,14 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager { AMQMessage message = messages.peek(); - while (message != null && ((sub != null && sub.isBrowser()) || message.taken(sub))) + //while (we have a message) && (The subscriber is not a browser or we are clearing) && (Check message is taken.) + while (message != null && (sub != null && !sub.isBrowser() || sub == null) && message.taken(sub)) { //remove the already taken message AMQMessage removed = messages.poll(); assert removed == message; - + _totalMessageSize.addAndGet(-message.getSize()); if (_log.isTraceEnabled()) @@ -494,7 +498,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager _extraMessages.decrementAndGet(); } - else if (messageQueue == sub.getPreDeliveryQueue()) + else if (messageQueue == sub.getPreDeliveryQueue() && !sub.isBrowser()) { if (_log.isInfoEnabled()) { @@ -695,7 +699,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager { if (_log.isDebugEnabled()) { - _log.debug(debugIdentity() + " Message(" + msg.debugIdentity() + + _log.debug(debugIdentity() + " Message(" + msg.toString() + ") has been taken so disregarding deliver request to Subscriber:" + System.identityHashCode(s)); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java index 20033daac7..d3578d39e8 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java @@ -256,10 +256,10 @@ public class SubscriptionImpl implements Subscription // We don't need to add the message to the unacknowledgedMap as we don't need to know if the client // received the message. If it is lost in transit that is not important. - if (_acks) - { - channel.addUnacknowledgedBrowsedMessage(msg, deliveryTag, consumerTag, queue); - } +// if (_acks) +// { +// channel.addUnacknowledgedBrowsedMessage(msg, deliveryTag, consumerTag, queue); +// } if (_sendLock.get()) { @@ -273,41 +273,49 @@ public class SubscriptionImpl implements Subscription private void sendToConsumer(StoreContext storeContext, AMQMessage msg, AMQQueue queue) throws AMQException { - // if we do not need to wait for client acknowledgements - // we can decrement the reference count immediately. + try + { // if we do not need to wait for client acknowledgements + // we can decrement the reference count immediately. - // By doing this _before_ the send we ensure that it - // doesn't get sent if it can't be dequeued, preventing - // duplicate delivery on recovery. + // By doing this _before_ the send we ensure that it + // doesn't get sent if it can't be dequeued, preventing + // duplicate delivery on recovery. - // The send may of course still fail, in which case, as - // the message is unacked, it will be lost. - if (!_acks) - { - if (_logger.isDebugEnabled()) + // The send may of course still fail, in which case, as + // the message is unacked, it will be lost. + if (!_acks) { - _logger.debug("No ack mode so dequeuing message immediately: " + msg.getMessageId()); + if (_logger.isDebugEnabled()) + { + _logger.debug("No ack mode so dequeuing message immediately: " + msg.getMessageId()); + } + queue.dequeue(storeContext, msg); } - queue.dequeue(storeContext, msg); - } - synchronized (channel) - { - long deliveryTag = channel.getNextDeliveryTag(); - if (_sendLock.get()) + synchronized (channel) { - _logger.error("Sending " + msg + " when subscriber(" + this + ") is closed!"); - } + long deliveryTag = channel.getNextDeliveryTag(); - if (_acks) - { - channel.addUnacknowledgedMessage(msg, deliveryTag, consumerTag, queue); - msg.decrementReference(storeContext); - } + if (_sendLock.get()) + { + _logger.error("Sending " + msg + " when subscriber(" + this + ") is closed!"); + } - protocolSession.getProtocolOutputConverter().writeDeliver(msg, channel.getChannelId(), deliveryTag, consumerTag); + if (_acks) + { + channel.addUnacknowledgedMessage(msg, deliveryTag, consumerTag, queue); + } + + protocolSession.getProtocolOutputConverter().writeDeliver(msg, channel.getChannelId(), deliveryTag, consumerTag); + + } + } + finally + { //Only set delivered if it actually was writen successfully.. - // using a try->finally would set it even if an error occured. + // using a try->finally would set it even if an error occured. + // Is this what we want? + msg.setDeliveredToConsumer(); } } @@ -461,14 +469,25 @@ public class SubscriptionImpl implements Subscription public void close() { + boolean closed = false; synchronized (_sendLock) { if (_logger.isDebugEnabled()) { - _logger.debug("Setting SendLock true"); + _logger.debug("Setting SendLock true:" + debugIdentity()); + } + + closed = _sendLock.getAndSet(true); + } + + if (closed) + { + if (_logger.isDebugEnabled()) + { + _logger.debug("Called close() on a closed subscription"); } - _sendLock.set(true); + return; } if (_logger.isInfoEnabled()) @@ -488,16 +507,36 @@ public class SubscriptionImpl implements Subscription //remove references in PDQ if (_messages != null) { + if (_logger.isInfoEnabled()) + { + _logger.info("Clearing PDQ (" + debugIdentity() + "):" + this); + } + _messages.clear(); } + } + + private void autoclose() + { + close(); if (_autoClose && !_sentClose) { - _logger.info("Closing autoclose subscription:" + this); + _logger.info("Closing autoclose subscription (" + debugIdentity() + "):" + this); + ProtocolOutputConverter converter = protocolSession.getProtocolOutputConverter(); converter.confirmConsumerAutoClose(channel.getChannelId(), consumerTag); - _sentClose = true; + + //fixme JIRA do this better + try + { + channel.unsubscribeConsumer(protocolSession, consumerTag); + } + catch (AMQException e) + { + // Occurs if we cannot find the subscriber in the channel with protocolSession and consumerTag. + } } } @@ -590,7 +629,7 @@ public class SubscriptionImpl implements Subscription { if (_messages.isEmpty()) { - close(); + autoclose(); return null; } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java b/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java index e5cce672f6..cf0da55f2a 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java +++ b/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java @@ -100,7 +100,7 @@ public class LocalTransactionalContext implements TransactionalContext // be added for every queue onto which the message is // enqueued. Finally a cleanup op will be added to decrement // the reference associated with the routing. - message.incrementReference(); +// message.incrementReference(); _postCommitDeliveryList.add(new DeliveryDetails(message, queue, deliverFirst)); _messageDelivered = true; /*_txnBuffer.enlist(new DeliverMessageOperation(message, queue)); diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java b/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java index 19146da22e..181dfa3a80 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java +++ b/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java @@ -93,7 +93,6 @@ public class NonTransactionalContext implements TransactionalContext { try { - message.incrementReference(); queue.process(_storeContext, message, deliverFirst); //following check implements the functionality //required by the 'immediate' flag: @@ -128,6 +127,8 @@ public class NonTransactionalContext implements TransactionalContext { _log.debug("Discarding message: " + message.message.getMessageId()); } + + //Message has been ack so discard it. This will dequeue and decrement the reference. message.discard(_storeContext); } else @@ -160,6 +161,8 @@ public class NonTransactionalContext implements TransactionalContext { _log.debug("Discarding message: " + msg.message.getMessageId()); } + + //Message has been ack so discard it. This will dequeue and decrement the reference. msg.discard(_storeContext); } else @@ -181,7 +184,22 @@ public class NonTransactionalContext implements TransactionalContext throw new AMQException("Single ack on delivery tag " + deliveryTag + " not known for channel:" + _channel.getChannelId()); } - msg.discard(_storeContext); + + if (!_browsedAcks.contains(deliveryTag)) + { + if (_log.isDebugEnabled()) + { + _log.debug("Discarding message: " + msg.message.getMessageId()); + } + + //Message has been ack so discard it. This will dequeue and decrement the reference. + msg.discard(_storeContext); + } + else + { + _browsedAcks.remove(deliveryTag); + } + if (_log.isDebugEnabled()) { _log.debug("Received non-multiple ack for messaging with delivery tag " + deliveryTag + " msg id " + diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/TxnBuffer.java b/java/broker/src/main/java/org/apache/qpid/server/txn/TxnBuffer.java index d04b93a469..339ca8ae1a 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/txn/TxnBuffer.java +++ b/java/broker/src/main/java/org/apache/qpid/server/txn/TxnBuffer.java @@ -27,10 +27,7 @@ import org.apache.log4j.Logger; import org.apache.qpid.AMQException; import org.apache.qpid.server.store.StoreContext; -/** - * Holds a list of TxnOp instance representing transactional - * operations. - */ +/** Holds a list of TxnOp instance representing transactional operations. */ public class TxnBuffer { private final List<TxnOp> _ops = new ArrayList<TxnOp>(); @@ -42,6 +39,11 @@ public class TxnBuffer public void commit(StoreContext context) throws AMQException { + if (_log.isDebugEnabled()) + { + _log.debug("Committing " + _ops.size() + " ops to commit.:" + _ops.toArray()); + } + if (prepare(context)) { for (TxnOp op : _ops) @@ -64,7 +66,7 @@ public class TxnBuffer catch (Exception e) { //compensate previously prepared ops - for(int j = 0; j < i; j++) + for (int j = 0; j < i; j++) { _ops.get(j).undoPrepare(); } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java b/java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java index 053d380129..4662f80c5b 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java @@ -49,6 +49,7 @@ public class AMQQueueBrowser implements QueueBrowser _session = session; _queue = queue; _messageSelector = (messageSelector == null) || (messageSelector.trim().length() == 0) ? null : messageSelector; + // Create Consumer to verify message selector. BasicMessageConsumer consumer = (BasicMessageConsumer) _session.createBrowserConsumer(_queue, _messageSelector, false); consumer.close(); } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 61143eee69..184bc44912 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -25,6 +25,7 @@ import java.text.MessageFormat; import java.util.ArrayList; import java.util.Iterator; import java.util.Map; +import java.util.Arrays; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -294,8 +295,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi if (_dispatcherLogger.isDebugEnabled()) { - _dispatcherLogger.debug("Dispatcher Connection " + (connectionStopped ? "Started" : "Stopped") + - ": Currently " + (currently ? "Started" : "Stopped")); + _dispatcherLogger.debug("Set Dispatcher Connection " + (connectionStopped ? "Stopped" : "Started") + + ": Currently " + (currently ? "Stopped" : "Started")); } } return currently; @@ -307,22 +308,31 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi { final BasicMessageConsumer consumer = (BasicMessageConsumer) _consumers.get(message.getDeliverBody().consumerTag); - if (consumer == null) + if (consumer == null || consumer.isClosed()) { if (_dispatcherLogger.isInfoEnabled()) { - _dispatcherLogger.info("Received a message(" + System.identityHashCode(message) + ")" + - "[" + message.getDeliverBody().deliveryTag + "] from queue " - + message.getDeliverBody().consumerTag + " without a handler - rejecting(requeue)..."); + if (consumer == null) + { + _dispatcherLogger.info("Received a message(" + System.identityHashCode(message) + ")" + + "[" + message.getDeliverBody().deliveryTag + "] from queue " + + message.getDeliverBody().consumerTag + + " )without a handler - rejecting(requeue)..."); + } + else + { + _dispatcherLogger.info("Received a message(" + System.identityHashCode(message) + ")" + + "[" + message.getDeliverBody().deliveryTag + "] from queue " + + " consumer(" + consumer.debugIdentity() + + ") is closed rejecting(requeue)..."); + } } rejectMessage(message, true); } else { - consumer.notifyMessage(message, _channelId); - } } } @@ -354,7 +364,18 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi for (BasicMessageConsumer consumer : _consumers.values()) { - consumer.rollback(); + if (!consumer.isNoConsume()) + { + consumer.rollback(); + } + else + { + // should perhaps clear the _SQ here. + //consumer._synchronousQueue.clear(); + consumer.clearReceiveQueue(); + } + + } setConnectionStopped(isStopped); @@ -379,8 +400,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi // Reject messages on pre-dispatch queue rejectMessagesForConsumerTag(consumer.getConsumerTag(), true); - // Remove consumer from map. - deregisterConsumer(consumer); + // closeConsumer + consumer.markClosed(); _dispatcher.setConnectionStopped(stopped); @@ -624,6 +645,11 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public void close(long timeout) throws JMSException { + if (_logger.isInfoEnabled()) + { + _logger.info("Closing session: " + this + ":" + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 6)); + } + // We must close down all producers and consumers in an orderly fashion. This is the only method // that can be called from a different thread of control from the one controlling the session synchronized (_connection.getFailoverMutex()) @@ -2063,26 +2089,39 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi // Remove the consumer from the map BasicMessageConsumer consumer = (BasicMessageConsumer) _consumers.get(consumerTag); if (consumer != null) - { - if (consumer.isAutoClose()) + { +// fixme this isn't right.. needs to check if _queue contains data for this consumer + if (consumer.isAutoClose())// && _queue.isEmpty()) { consumer.closeWhenNoMessages(true); } - //Clean the Maps up first - //Flush any pending messages for this consumerTag - if (_dispatcher != null) + if (!consumer.isNoConsume()) { - _logger.info("Dispatcher is not null"); + //Clean the Maps up first + //Flush any pending messages for this consumerTag + if (_dispatcher != null) + { + _logger.info("Dispatcher is not null"); + } + else + { + _logger.info("Dispatcher is null so created stopped dispatcher"); + + startDistpatcherIfNecessary(true); + } + + _dispatcher.rejectPending(consumer); } else { - _logger.info("Dispatcher is null so created stopped dispatcher"); + //Just close the consumer + //fixme the CancelOK is being processed before the arriving messages.. + // The dispatcher is still to process them so the server sent in order but the client + // has yet to receive before the close comes in. - startDistpatcherIfNecessary(true); +// consumer.markClosed(); } - - _dispatcher.rejectPending(consumer); } else { @@ -2217,7 +2256,20 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi private void rejectMessagesForConsumerTag(AMQShortString consumerTag, boolean requeue) { Iterator messages = _queue.iterator(); + if (_logger.isInfoEnabled()) + { + _logger.info("Rejecting messages from _queue for Consumer tag(" + consumerTag + + ") (PDispatchQ) requeue:" + requeue); + if (messages.hasNext()) + { + _logger.info("Checking all messages in _queue for Consumer tag(" + consumerTag + ")"); + } + else + { + _logger.info("No messages in _queue to reject"); + } + } while (messages.hasNext()) { UnprocessedMessage message = (UnprocessedMessage) messages.next(); @@ -2239,10 +2291,6 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi _logger.debug("Rejected the message(" + message.getDeliverBody() + ") for consumer :" + consumerTag); } } - else - { - _logger.error("Pruned pending message for consumer:" + consumerTag); - } } } @@ -2250,9 +2298,9 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public void rejectMessage(UnprocessedMessage message, boolean requeue) { - if (_logger.isDebugEnabled()) + if (_logger.isTraceEnabled()) { - _logger.debug("Rejecting Unacked message:" + message.getDeliverBody().deliveryTag); + _logger.trace("Rejecting Unacked message:" + message.getDeliverBody().deliveryTag); } rejectMessage(message.getDeliverBody().deliveryTag, requeue); @@ -2260,9 +2308,9 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public void rejectMessage(AbstractJMSMessage message, boolean requeue) { - if (_logger.isDebugEnabled()) + if (_logger.isTraceEnabled()) { - _logger.debug("Rejecting Abstract message:" + message.getDeliveryTag()); + _logger.trace("Rejecting Abstract message:" + message.getDeliveryTag()); } rejectMessage(message.getDeliveryTag(), requeue); diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index 9043faa80c..73010ce517 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -22,6 +22,7 @@ package org.apache.qpid.client; import java.util.Iterator; import java.util.List; +import java.util.Arrays; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.TimeUnit; @@ -118,7 +119,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer private ConcurrentLinkedQueue<Long> _unacknowledgedDeliveryTags = new ConcurrentLinkedQueue<Long>(); - /** List of tags delievered, The last of which which should be acknowledged on commit in transaction mode. */ + /** List of tags delievered, The last of which which should be acknowledged on commit in transaction mode. */ private ConcurrentLinkedQueue<Long> _receivedDeliveryTags = new ConcurrentLinkedQueue<Long>(); /** @@ -135,6 +136,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer private boolean _closeWhenNoMessages; private boolean _noConsume; + private List<StackTraceElement> _closedStack = null; protected BasicMessageConsumer(int channelId, AMQConnection connection, AMQDestination destination, String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory, AMQSession session, @@ -157,6 +159,12 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer _synchronousQueue = new ArrayBlockingQueue(prefetchHigh, true); _autoClose = autoClose; _noConsume = noConsume; + + //Force queue browsers not to use acknowledge modes. + if (_noConsume) + { + _acknowledgeMode = Session.NO_ACKNOWLEDGE; + } } public AMQDestination getDestination() @@ -433,6 +441,8 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer public void close(boolean sendClose) throws JMSException { + //synchronized (_closed) + if (_logger.isInfoEnabled()) { _logger.info("Closing consumer:" + debugIdentity()); @@ -442,6 +452,18 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer { if (!_closed.getAndSet(true)) { + if (_logger.isTraceEnabled()) + { + if (_closedStack != null) + { + _logger.trace(_consumerTag + " close():" + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 6)); + _logger.trace(_consumerTag + " previously:" + _closedStack.toString()); + } + else + { + _closedStack = Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 6); + } + } if (sendClose) { // TODO: Be aware of possible changes to parameter order as versions change. @@ -467,9 +489,15 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer throw new JMSException("Error closing consumer: " + e); } } + else + { +// //fixme this probably is not right +// if (!isNoConsume()) + { //done in BasicCancelOK Handler but not sending one so just deregister. + deregisterConsumer(); + } + } - //done in BasicCancelOK Handler - //deregisterConsumer(); if (_messageListener != null && _receiving.get()) { if (_logger.isInfoEnabled()) @@ -488,7 +516,23 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer */ void markClosed() { - _closed.set(true); +// synchronized (_closed) + { + _closed.set(true); + + if (_logger.isTraceEnabled()) + { + if (_closedStack != null) + { + _logger.trace(_consumerTag + " markClosed():" + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 8)); + _logger.trace(_consumerTag + " previously:" + _closedStack.toString()); + } + else + { + _closedStack = Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 8); + } + } + } deregisterConsumer(); } @@ -520,11 +564,24 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer { _logger.debug("Message is of type: " + jmsMessage.getClass().getName()); } - jmsMessage.setConsumer(this); +// synchronized (_closed) + { +// if (!_closed.get()) + { + + jmsMessage.setConsumer(this); - preDeliver(jmsMessage); + preDeliver(jmsMessage); - notifyMessage(jmsMessage, channelId); + notifyMessage(jmsMessage, channelId); + } +// else +// { +// _logger.error("MESSAGE REJECTING!"); +// _session.rejectMessage(jmsMessage, true); +// //_logger.error("MESSAGE JUST DROPPED!"); +// } + } } catch (Exception e) { @@ -551,9 +608,16 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer { //we do not need a lock around the test above, and the dispatch below as it is invalid //for an application to alter an installed listener while the session is started - preApplicationProcessing(jmsMessage); - getMessageListener().onMessage(jmsMessage); - postDeliver(jmsMessage); +// synchronized (_closed) + { +// if (!_closed.get()) + { + + preApplicationProcessing(jmsMessage); + getMessageListener().onMessage(jmsMessage); + postDeliver(jmsMessage); + } + } } else { @@ -649,14 +713,30 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer lastDeliveryTag = _receivedDeliveryTags.poll(); } + assert _receivedDeliveryTags.isEmpty(); + _session.acknowledgeMessage(lastDeliveryTag, true); } } void notifyError(Throwable cause) { - _closed.set(true); - +// synchronized (_closed) + { + _closed.set(true); + if (_logger.isTraceEnabled()) + { + if (_closedStack != null) + { + _logger.trace(_consumerTag + " notifyError():" + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 8)); + _logger.trace(_consumerTag + " previously" + _closedStack.toString()); + } + else + { + _closedStack = Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 8); + } + } + } //QPID-293 can "request redelivery of this error through dispatcher" // we have no way of propagating the exception to a message listener - a JMS limitation - so we @@ -761,14 +841,20 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer { clearUnackedMessages(); - if (_logger.isDebugEnabled()) + if (!_receivedDeliveryTags.isEmpty()) { - _logger.debug("Rejecting received messages"); + _logger.debug("Rejecting received messages in _receivedDTs (RQ)"); } //rollback received but not committed messages while (!_receivedDeliveryTags.isEmpty()) { + if (_logger.isDebugEnabled()) + { + _logger.debug("Rejecting the messages(" + _receivedDeliveryTags.size() + ") in _receivedDTs (RQ)" + + "for consumer with tag:" + _consumerTag); + } + Long tag = _receivedDeliveryTags.poll(); if (tag != null) @@ -782,12 +868,20 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer } } + if (!_receivedDeliveryTags.isEmpty()) + { + if (_logger.isDebugEnabled()) + { + _logger.debug("Queue _receivedDTs (RQ) was not empty after rejection"); + } + } + //rollback pending messages if (_synchronousQueue.size() > 0) { if (_logger.isDebugEnabled()) { - _logger.debug("Rejecting the messages(" + _synchronousQueue.size() + ")" + + _logger.debug("Rejecting the messages(" + _synchronousQueue.size() + ") in _syncQueue (PRQ)" + "for consumer with tag:" + _consumerTag); } Iterator iterator = _synchronousQueue.iterator(); @@ -821,7 +915,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer rollback(); } - _synchronousQueue.clear(); + clearReceiveQueue(); } } @@ -831,4 +925,8 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer return String.valueOf(_consumerTag); } + public void clearReceiveQueue() + { + _synchronousQueue.clear(); + } } diff --git a/java/client/src/main/java/org/apache/qpid/client/Closeable.java b/java/client/src/main/java/org/apache/qpid/client/Closeable.java index 6593f2c254..d246dc3931 100644 --- a/java/client/src/main/java/org/apache/qpid/client/Closeable.java +++ b/java/client/src/main/java/org/apache/qpid/client/Closeable.java @@ -25,20 +25,18 @@ import java.util.concurrent.atomic.AtomicBoolean; import javax.jms.IllegalStateException; import javax.jms.JMSException; -/** - * Provides support for orderly shutdown of an object. - */ +/** Provides support for orderly shutdown of an object. */ public abstract class Closeable { /** - * We use an atomic boolean so that we do not have to synchronized access to this flag. Synchronizing - * access to this flag would mean have a synchronized block in every method. + * We use an atomic boolean so that we do not have to synchronized access to this flag. Synchronizing access to this + * flag would mean have a synchronized block in every method. */ protected final AtomicBoolean _closed = new AtomicBoolean(false); protected void checkNotClosed() throws JMSException { - if (_closed.get()) + if (isClosed()) { throw new IllegalStateException("Object " + toString() + " has been closed"); } @@ -46,7 +44,10 @@ public abstract class Closeable public boolean isClosed() { - return _closed.get(); +// synchronized (_closed) + { + return _closed.get(); + } } public abstract void close() throws JMSException; diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseOkMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseOkMethodHandler.java index 794071cc34..0826deb2f4 100644 --- a/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseOkMethodHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseOkMethodHandler.java @@ -42,6 +42,6 @@ public class ChannelCloseOkMethodHandler implements StateAwareMethodListener { _logger.info("Received channel-close-ok for channel-id " + evt.getChannelId()); - //todo this should do the closure + //todo this should do the local closure } } diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java index 19767b6575..e875b4dca8 100644 --- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java @@ -51,10 +51,8 @@ import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; import org.apache.qpid.protocol.AMQConstant; /** - * Wrapper for protocol session that provides type-safe access to session attributes. - * <p/> - * The underlying protocol session is still available but clients should not - * use it to obtain session attributes. + * Wrapper for protocol session that provides type-safe access to session attributes. <p/> The underlying protocol + * session is still available but clients should not use it to obtain session attributes. */ public class AMQProtocolSession implements AMQVersionAwareProtocolSession { @@ -78,27 +76,23 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession protected WriteFuture _lastWriteFuture; /** - * The handler from which this session was created and which is used to handle protocol events. - * We send failover events to the handler. + * The handler from which this session was created and which is used to handle protocol events. We send failover + * events to the handler. */ protected final AMQProtocolHandler _protocolHandler; - /** - * Maps from the channel id to the AMQSession that it represents. - */ + /** Maps from the channel id to the AMQSession that it represents. */ protected ConcurrentMap _channelId2SessionMap = new ConcurrentHashMap(); protected ConcurrentMap _closingChannels = new ConcurrentHashMap(); /** - * Maps from a channel id to an unprocessed message. This is used to tie together the - * JmsDeliverBody (which arrives first) with the subsequent content header and content bodies. + * Maps from a channel id to an unprocessed message. This is used to tie together the JmsDeliverBody (which arrives + * first) with the subsequent content header and content bodies. */ protected ConcurrentMap _channelId2UnprocessedMsgMap = new ConcurrentHashMap(); - /** - * Counter to ensure unique queue names - */ + /** Counter to ensure unique queue names */ protected int _queueId = 1; protected final Object _queueIdLock = new Object(); @@ -108,8 +102,8 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession /** - * No-arg constructor for use by test subclass - has to initialise final vars - * NOT intended for use other then for test + * No-arg constructor for use by test subclass - has to initialise final vars NOT intended for use other then for + * test */ public AMQProtocolSession() { @@ -147,7 +141,7 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession { // start the process of setting up the connection. This is the first place that // data is written to the server. - + _minaProtocolSession.write(new ProtocolInitiation(ProtocolVersion.getLatestSupportedVersion())); } @@ -207,8 +201,7 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession /** * Store the SASL client currently being used for the authentication handshake * - * @param client if non-null, stores this in the session. if null clears any existing client - * being stored + * @param client if non-null, stores this in the session. if null clears any existing client being stored */ public void setSaslClient(SaslClient client) { @@ -237,10 +230,11 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession } /** - * Callback invoked from the BasicDeliverMethodHandler when a message has been received. - * This is invoked on the MINA dispatcher thread. + * Callback invoked from the BasicDeliverMethodHandler when a message has been received. This is invoked on the MINA + * dispatcher thread. * * @param message + * * @throws AMQException if this was not expected */ public void unprocessedMessageReceived(UnprocessedMessage message) throws AMQException @@ -295,8 +289,7 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession } /** - * Deliver a message to the appropriate session, removing the unprocessed message - * from our map + * Deliver a message to the appropriate session, removing the unprocessed message from our map * * @param channelId the channel id the message should be delivered to * @param msg the message @@ -309,8 +302,8 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession } /** - * Convenience method that writes a frame to the protocol session. Equivalent - * to calling getProtocolSession().write(). + * Convenience method that writes a frame to the protocol session. Equivalent to calling + * getProtocolSession().write(). * * @param frame the frame to write */ @@ -377,15 +370,14 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession } /** - * Called from the ChannelClose handler when a channel close frame is received. - * This method decides whether this is a response or an initiation. The latter - * case causes the AMQSession to be closed and an exception to be thrown if + * Called from the ChannelClose handler when a channel close frame is received. This method decides whether this is + * a response or an initiation. The latter case causes the AMQSession to be closed and an exception to be thrown if * appropriate. * * @param channelId the id of the channel (session) - * @return true if the client must respond to the server, i.e. if the server - * initiated the channel close, false if the channel close is just the server - * responding to the client's earlier request to close the channel. + * + * @return true if the client must respond to the server, i.e. if the server initiated the channel close, false if + * the channel close is just the server responding to the client's earlier request to close the channel. */ public boolean channelClosed(int channelId, AMQConstant code, String text) throws AMQException { @@ -450,9 +442,7 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession return new AMQShortString("tmp_" + localAddress + "_" + id); } - /** - * @param delay delay in seconds (not ms) - */ + /** @param delay delay in seconds (not ms) */ void initHeartbeats(int delay) { if (delay > 0) @@ -475,7 +465,7 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession { _protocolMajorVersion = versionMajor; _protocolMinorVersion = versionMinor; - _registry = MainRegistry.getVersionSpecificRegistry(versionMajor, versionMinor); + _registry = MainRegistry.getVersionSpecificRegistry(versionMajor, versionMinor); } public byte getProtocolMinorVersion() diff --git a/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java b/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java index 69042d08ea..8368eee125 100644 --- a/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java @@ -38,12 +38,10 @@ import org.apache.qpid.jms.BrokerDetails; import org.apache.qpid.pool.ReadWriteThreadModel; /** - * The TransportConnection is a helper class responsible for connecting to an AMQ server. It sets up - * the underlying connector, which currently always uses TCP/IP sockets. It creates the - * "protocol handler" which deals with MINA protocol events. - * <p/> - * Could be extended in future to support different transport types by turning this into concrete class/interface - * combo. + * The TransportConnection is a helper class responsible for connecting to an AMQ server. It sets up the underlying + * connector, which currently always uses TCP/IP sockets. It creates the "protocol handler" which deals with MINA + * protocol events. <p/> Could be extended in future to support different transport types by turning this into concrete + * class/interface combo. */ public class TransportConnection { @@ -61,22 +59,6 @@ public class TransportConnection private static final String DEFAULT_QPID_SERVER = "org.apache.qpid.server.protocol.AMQPFastProtocolHandler"; - static - { - _acceptor = new VmPipeAcceptor(); - - IoServiceConfig config = _acceptor.getDefaultConfig(); - - config.setThreadModel(ReadWriteThreadModel.getInstance()); - } - - public static ITransportConnection getInstance() throws AMQTransportConnectionException - { - AMQBrokerDetails details = new AMQBrokerDetails(); - details.setTransport(BrokerDetails.TCP); - return getInstance(details); - } - public static ITransportConnection getInstance(BrokerDetails details) throws AMQTransportConnectionException { int transport = getTransport(details.getTransport()); @@ -182,7 +164,14 @@ public class TransportConnection public static void createVMBroker(int port) throws AMQVMBrokerCreationException { + if (_acceptor == null) + { + _acceptor = new VmPipeAcceptor(); + IoServiceConfig config = _acceptor.getDefaultConfig(); + + config.setThreadModel(ReadWriteThreadModel.getInstance()); + } if (!_inVmPipeAddress.containsKey(port)) { diff --git a/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java b/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java index cb4ef01d25..642b928d81 100644 --- a/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java +++ b/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java @@ -44,6 +44,10 @@ public class FlowControllingBlockingQueue /** We require a separate count so we can track whether we have reached the threshold */ private int _count; + public boolean isEmpty() + { + return _queue.isEmpty(); + } public interface ThresholdListener { diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java index fe15fa5155..1e50a62fee 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java @@ -39,8 +39,8 @@ import org.apache.log4j.Logger; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQQueue; import org.apache.qpid.client.AMQSession; +import org.apache.qpid.client.transport.TransportConnection; import org.apache.qpid.client.message.JMSTextMessage; -import org.apache.qpid.testutil.VMBrokerSetup; public class PropertyValueTest extends TestCase implements MessageListener { @@ -59,19 +59,13 @@ public class PropertyValueTest extends TestCase implements MessageListener protected void setUp() throws Exception { super.setUp(); - try - { - init(new AMQConnection(_connectionString, "guest", "guest", randomize("Client"), "test")); - } - catch (Exception e) - { - fail("Unable to initialilse connection: " + e); - } + TransportConnection.createVMBroker(1); } protected void tearDown() throws Exception { super.tearDown(); + TransportConnection.killVMBroker(1); } private void init(AMQConnection connection) throws Exception @@ -91,14 +85,48 @@ public class PropertyValueTest extends TestCase implements MessageListener connection.start(); } - public void test() throws Exception + public void testOnce() { - int count = _count; - send(count); - waitFor(count); - check(); - _logger.info("Completed without failure"); - _connection.close(); + runBatch(1); + } + + public void test50() + { + runBatch(50); + } + + private void runBatch(int runSize) + { + try + { + int run = 0; + while (run < runSize) + { + _logger.error("Run Number:" + run++); + try + { + init(new AMQConnection(_connectionString, "guest", "guest", randomize("Client"), "test")); + } + catch (Exception e) + { + fail("Unable to initialilse connection: " + e); + } + + int count = _count; + send(count); + waitFor(count); + check(); + _logger.info("Completed without failure"); + _connection.close(); + + _logger.error("End Run Number:" + (run - 1)); + } + } + catch (Exception e) + { + _logger.error(e.getMessage(), e); + e.printStackTrace(); + } } void send(int count) throws JMSException @@ -138,7 +166,7 @@ public class PropertyValueTest extends TestCase implements MessageListener m.setJMSReplyTo(q); m.setStringProperty("TempQueue", q.toString()); - _logger.info("Message:" + m); + _logger.trace("Message:" + m); Assert.assertEquals("Check temp queue has been set correctly", m.getJMSReplyTo().toString(), m.getStringProperty("TempQueue")); @@ -150,7 +178,7 @@ public class PropertyValueTest extends TestCase implements MessageListener m.setShortProperty("Short", (short) Short.MAX_VALUE); m.setStringProperty("String", "Test"); - _logger.info("Sending Msg:" + m); + _logger.debug("Sending Msg:" + m); producer.send(m); } } @@ -206,8 +234,11 @@ public class PropertyValueTest extends TestCase implements MessageListener Assert.assertEquals("Check String properties are correctly transported", "Test", m.getStringProperty("String")); } + received.clear(); assertEqual(messages.iterator(), actual.iterator()); + + messages.clear(); } private static void assertEqual(Iterator expected, Iterator actual) @@ -269,11 +300,11 @@ public class PropertyValueTest extends TestCase implements MessageListener { test._count = Integer.parseInt(argv[1]); } - test.test(); + test.testOnce(); } public static junit.framework.Test suite() { - return new VMBrokerSetup(new junit.framework.TestSuite(PropertyValueTest.class)); + return new junit.framework.TestSuite(PropertyValueTest.class); } } diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java index a56bae3d70..7762cb3fe9 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java @@ -42,6 +42,7 @@ import org.apache.qpid.client.message.AbstractJMSMessage; import org.apache.qpid.client.transport.TransportConnection; import org.apache.qpid.url.URLSyntaxException; import org.apache.qpid.AMQException; +import org.apache.qpid.testutil.QpidClientConnection; import org.apache.log4j.Logger; import org.apache.log4j.Level; @@ -62,14 +63,14 @@ public class MessageRequeueTest extends TestCase private boolean testReception = true; private long[] receieved = new long[numTestMessages + 1]; - private boolean passed=false; + private boolean passed = false; protected void setUp() throws Exception { super.setUp(); TransportConnection.createVMBroker(1); - QpidClientConnection conn = new QpidClientConnection(); + QpidClientConnection conn = new QpidClientConnection(BROKER); conn.connect(); // clear queue @@ -85,21 +86,28 @@ public class MessageRequeueTest extends TestCase { super.tearDown(); - if (!passed) + if (!passed) // clean up { - QpidClientConnection conn = new QpidClientConnection(); + QpidClientConnection conn = new QpidClientConnection(BROKER); conn.connect(); // clear queue conn.consume(queue, consumeTimeout); + + conn.disconnect(); } TransportConnection.killVMBroker(1); } - /** multiple consumers */ + /** + * multiple consumers + * + * @throws javax.jms.JMSException if a JMS problem occurs + * @throws InterruptedException on timeout + */ public void testDrain() throws JMSException, InterruptedException { - QpidClientConnection conn = new QpidClientConnection(); + QpidClientConnection conn = new QpidClientConnection(BROKER); conn.connect(); @@ -170,6 +178,7 @@ public class MessageRequeueTest extends TestCase assertEquals(list.toString(), 0, failed); _logger.info("consumed: " + messagesReceived); conn.disconnect(); + passed = true; } /** multiple consumers */ @@ -186,8 +195,8 @@ public class MessageRequeueTest extends TestCase Thread t4 = new Thread(c4); t1.start(); -// t2.start(); -// t3.start(); + t2.start(); + t3.start(); // t4.start(); try @@ -230,7 +239,7 @@ public class MessageRequeueTest extends TestCase } assertEquals(list.toString() + "-" + numTestMessages + "-" + totalConsumed, 0, failed); assertEquals("number of consumed messages does not match initial data", numTestMessages, totalConsumed); - passed=true; + passed = true; } class Consumer implements Runnable @@ -248,7 +257,7 @@ public class MessageRequeueTest extends TestCase try { _logger.info("consumer-" + id + ": starting"); - QpidClientConnection conn = new QpidClientConnection(); + QpidClientConnection conn = new QpidClientConnection(BROKER); conn.connect(); @@ -318,286 +327,51 @@ public class MessageRequeueTest extends TestCase } - public class QpidClientConnection implements ExceptionListener + public void testRequeue() throws JMSException, AMQException, URLSyntaxException { - private boolean transacted = true; - private int ackMode = Session.CLIENT_ACKNOWLEDGE; - private Connection connection; - - private String virtualHost; - private String brokerlist; - private int prefetch; - protected Session session; - protected boolean connected; - - public QpidClientConnection() - { - super(); - setVirtualHost("/test"); - setBrokerList(BROKER); - setPrefetch(5000); - } - - - public void connect() throws JMSException - { - if (!connected) - { - /* - * amqp://[user:pass@][clientid]/virtualhost? - * brokerlist='[transport://]host[:port][?option='value'[&option='value']];' - * [&failover='method[?option='value'[&option='value']]'] - * [&option='value']" - */ - String brokerUrl = "amqp://guest:guest@" + virtualHost + "?brokerlist='" + brokerlist + "'"; - try - { - AMQConnectionFactory factory = new AMQConnectionFactory(new AMQConnectionURL(brokerUrl)); - _logger.info("connecting to Qpid :" + brokerUrl); - connection = factory.createConnection(); - - // register exception listener - connection.setExceptionListener(this); - - session = ((AMQConnection) connection).createSession(transacted, ackMode, prefetch); - - - _logger.info("starting connection"); - connection.start(); - - connected = true; - } - catch (URLSyntaxException e) - { - throw new JMSException("URL syntax error in [" + brokerUrl + "]: " + e.getMessage()); - } - } - } - - public void disconnect() throws JMSException - { - if (connected) - { - session.commit(); - session.close(); - connection.close(); - connected = false; - _logger.info("disconnected"); - } - } - - public void disconnectWithoutCommit() throws JMSException - { - if (connected) - { - session.close(); - connection.close(); - connected = false; - _logger.info("disconnected without commit"); - } - } - - public String getBrokerList() - { - return brokerlist; - } - - public void setBrokerList(String brokerlist) - { - this.brokerlist = brokerlist; - } - - public String getVirtualHost() + int run = 0; + while (run < 10) { - return virtualHost; - } - - public void setVirtualHost(String virtualHost) - { - this.virtualHost = virtualHost; - } - - public void setPrefetch(int prefetch) - { - this.prefetch = prefetch; - } + run++; - - /** override as necessary */ - public void onException(JMSException exception) - { - _logger.info("ExceptionListener event: error " + exception.getErrorCode() + ", message: " + exception.getMessage()); - } - - public boolean isConnected() - { - return connected; - } - - public Session getSession() - { - return session; - } - - /** - * Put a String as a text messages, repeat n times. A null payload will result in a null message. - * - * @param queueName The queue name to put to - * @param payload the content of the payload - * @param copies the number of messages to put - * - * @throws javax.jms.JMSException any exception that occurs - */ - public void put(String queueName, String payload, int copies) throws JMSException - { - if (!connected) - { - connect(); - } - - _logger.info("putting to queue " + queueName); - Queue queue = session.createQueue(queueName); - - final MessageProducer sender = session.createProducer(queue); - - for (int i = 0; i < copies; i++) - { - Message m = session.createTextMessage(payload + i); - m.setIntProperty("index", i + 1); - sender.send(m); - } - - session.commit(); - sender.close(); - _logger.info("put " + copies + " copies"); - } - - /** - * GET the top message on a queue. Consumes the message. Accepts timeout value. - * - * @param queueName The quename to get from - * @param readTimeout The timeout to use - * - * @return the content of the text message if any - * - * @throws javax.jms.JMSException any exception that occured - */ - public Message getNextMessage(String queueName, long readTimeout) throws JMSException - { - if (!connected) + if (_logger.isInfoEnabled()) { - connect(); + _logger.info("testRequeue run " + run); } - Queue queue = session.createQueue(queueName); + String virtualHost = "/test"; + String brokerlist = BROKER; + String brokerUrl = "amqp://guest:guest@" + virtualHost + "?brokerlist='" + brokerlist + "'"; - final MessageConsumer consumer = session.createConsumer(queue); + Connection conn = new AMQConnection(brokerUrl); + Session session = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE); + Queue q = session.createQueue(queue); - Message message = consumer.receive(readTimeout); - session.commit(); - consumer.close(); - - Message result; + _logger.debug("Create Consumer"); + MessageConsumer consumer = session.createConsumer(q); - // all messages we consume should be TextMessages - if (message instanceof TextMessage) - { - result = ((TextMessage) message); - } - else if (null == message) + try { - result = null; + Thread.sleep(2000); } - else + catch (InterruptedException e) { - _logger.info("warning: received non-text message"); - result = message; + // } - return result; - } + _logger.debug("Receiving msg"); + Message msg = consumer.receive(1000); - /** - * GET the top message on a queue. Consumes the message. - * - * @param queueName The Queuename to get from - * - * @return The string content of the text message, if any received - * - * @throws javax.jms.JMSException any exception that occurs - */ - public Message getNextMessage(String queueName) throws JMSException - { - return getNextMessage(queueName, 0); - } + assertNotNull("Message should not be null", msg); - /** - * Completely clears a queue. For readTimeout behaviour see Javadocs for javax.jms.MessageConsumer. - * - * @param queueName The Queue name to consume from - * @param readTimeout The timeout for each consume - * - * @throws javax.jms.JMSException Any exception that occurs during the consume - * @throws InterruptedException If the consume thread was interrupted during a consume. - */ - public void consume(String queueName, int readTimeout) throws JMSException, InterruptedException - { - if (!connected) - { - connect(); - } - - _logger.info("consuming queue " + queueName); - Queue queue = session.createQueue(queueName); - - final MessageConsumer consumer = session.createConsumer(queue); - int messagesReceived = 0; - - _logger.info("consuming..."); - while ((consumer.receive(readTimeout)) != null) - { - messagesReceived++; - } - session.commit(); + // As we have not ack'd message will be requeued. + _logger.debug("Close Consumer"); consumer.close(); - _logger.info("consumed: " + messagesReceived); - } - } - - - public void testRequeue() throws JMSException, AMQException, URLSyntaxException - { - String virtualHost = "/test"; - String brokerlist = "vm://:1"; - String brokerUrl = "amqp://guest:guest@" + virtualHost + "?brokerlist='" + brokerlist + "'"; - Connection conn = new AMQConnection(brokerUrl); - Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); - Queue q = session.createQueue(queue); - - _logger.info("Create Consumer"); - MessageConsumer consumer = session.createConsumer(q); - - try - { - Thread.sleep(2000); - } - catch (InterruptedException e) - { - // + _logger.debug("Close Connection"); + conn.close(); } - - _logger.info("Receiving msg"); - Message msg = consumer.receive(); - - assertNotNull("Message should not be null", msg); - - _logger.info("Close Consumer"); - consumer.close(); - - _logger.info("Close Connection"); - conn.close(); } }
\ No newline at end of file diff --git a/java/client/src/test/java/org/apache/qpid/testutil/QpidClientConnection.java b/java/client/src/test/java/org/apache/qpid/testutil/QpidClientConnection.java new file mode 100644 index 0000000000..f2afa472ab --- /dev/null +++ b/java/client/src/test/java/org/apache/qpid/testutil/QpidClientConnection.java @@ -0,0 +1,268 @@ +package org.apache.qpid.testutil; + +import org.apache.qpid.client.AMQConnectionFactory; +import org.apache.qpid.client.AMQConnectionURL; +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.url.URLSyntaxException; +import org.apache.log4j.Logger; + +import javax.jms.ExceptionListener; +import javax.jms.Session; +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.Queue; +import javax.jms.MessageProducer; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.TextMessage; + +public class QpidClientConnection implements ExceptionListener +{ + + private static final Logger _logger = Logger.getLogger(QpidClientConnection.class); + + private boolean transacted = true; + private int ackMode = Session.CLIENT_ACKNOWLEDGE; + private Connection connection; + + private String virtualHost; + private String brokerlist; + private int prefetch; + protected Session session; + protected boolean connected; + + public QpidClientConnection(String broker) + { + super(); + setVirtualHost("/test"); + setBrokerList(broker); + setPrefetch(5000); + } + + + public void connect() throws JMSException + { + if (!connected) + { + /* + * amqp://[user:pass@][clientid]/virtualhost? + * brokerlist='[transport://]host[:port][?option='value'[&option='value']];' + * [&failover='method[?option='value'[&option='value']]'] + * [&option='value']" + */ + String brokerUrl = "amqp://guest:guest@" + virtualHost + "?brokerlist='" + brokerlist + "'"; + try + { + AMQConnectionFactory factory = new AMQConnectionFactory(new AMQConnectionURL(brokerUrl)); + _logger.info("connecting to Qpid :" + brokerUrl); + connection = factory.createConnection(); + + // register exception listener + connection.setExceptionListener(this); + + session = ((AMQConnection) connection).createSession(transacted, ackMode, prefetch); + + + _logger.info("starting connection"); + connection.start(); + + connected = true; + } + catch (URLSyntaxException e) + { + throw new JMSException("URL syntax error in [" + brokerUrl + "]: " + e.getMessage()); + } + } + } + + public void disconnect() throws JMSException + { + if (connected) + { + session.commit(); + session.close(); + connection.close(); + connected = false; + _logger.info("disconnected"); + } + } + + public void disconnectWithoutCommit() throws JMSException + { + if (connected) + { + session.close(); + connection.close(); + connected = false; + _logger.info("disconnected without commit"); + } + } + + public String getBrokerList() + { + return brokerlist; + } + + public void setBrokerList(String brokerlist) + { + this.brokerlist = brokerlist; + } + + public String getVirtualHost() + { + return virtualHost; + } + + public void setVirtualHost(String virtualHost) + { + this.virtualHost = virtualHost; + } + + public void setPrefetch(int prefetch) + { + this.prefetch = prefetch; + } + + + /** override as necessary */ + public void onException(JMSException exception) + { + _logger.info("ExceptionListener event: error " + exception.getErrorCode() + ", message: " + exception.getMessage()); + } + + public boolean isConnected() + { + return connected; + } + + public Session getSession() + { + return session; + } + + /** + * Put a String as a text messages, repeat n times. A null payload will result in a null message. + * + * @param queueName The queue name to put to + * @param payload the content of the payload + * @param copies the number of messages to put + * + * @throws javax.jms.JMSException any exception that occurs + */ + public void put(String queueName, String payload, int copies) throws JMSException + { + if (!connected) + { + connect(); + } + + _logger.info("putting to queue " + queueName); + Queue queue = session.createQueue(queueName); + + final MessageProducer sender = session.createProducer(queue); + + for (int i = 0; i < copies; i++) + { + Message m = session.createTextMessage(payload + i); + m.setIntProperty("index", i + 1); + sender.send(m); + } + + session.commit(); + sender.close(); + _logger.info("put " + copies + " copies"); + } + + /** + * GET the top message on a queue. Consumes the message. Accepts timeout value. + * + * @param queueName The quename to get from + * @param readTimeout The timeout to use + * + * @return the content of the text message if any + * + * @throws javax.jms.JMSException any exception that occured + */ + public Message getNextMessage(String queueName, long readTimeout) throws JMSException + { + if (!connected) + { + connect(); + } + + Queue queue = session.createQueue(queueName); + + final MessageConsumer consumer = session.createConsumer(queue); + + Message message = consumer.receive(readTimeout); + session.commit(); + consumer.close(); + + Message result; + + // all messages we consume should be TextMessages + if (message instanceof TextMessage) + { + result = ((TextMessage) message); + } + else if (null == message) + { + result = null; + } + else + { + _logger.info("warning: received non-text message"); + result = message; + } + + return result; + } + + /** + * GET the top message on a queue. Consumes the message. + * + * @param queueName The Queuename to get from + * + * @return The string content of the text message, if any received + * + * @throws javax.jms.JMSException any exception that occurs + */ + public Message getNextMessage(String queueName) throws JMSException + { + return getNextMessage(queueName, 0); + } + + /** + * Completely clears a queue. For readTimeout behaviour see Javadocs for javax.jms.MessageConsumer. + * + * @param queueName The Queue name to consume from + * @param readTimeout The timeout for each consume + * + * @throws javax.jms.JMSException Any exception that occurs during the consume + * @throws InterruptedException If the consume thread was interrupted during a consume. + */ + public void consume(String queueName, int readTimeout) throws JMSException, InterruptedException + { + if (!connected) + { + connect(); + } + + _logger.info("consuming queue " + queueName); + Queue queue = session.createQueue(queueName); + + final MessageConsumer consumer = session.createConsumer(queue); + int messagesReceived = 0; + + _logger.info("consuming..."); + while ((consumer.receive(readTimeout)) != null) + { + messagesReceived++; + } + + session.commit(); + consumer.close(); + _logger.info("consumed: " + messagesReceived); + } +} + diff --git a/java/common/src/main/java/org/apache/qpid/util/ConcurrentLinkedMessageQueueAtomicSize.java b/java/common/src/main/java/org/apache/qpid/util/ConcurrentLinkedMessageQueueAtomicSize.java index 883d5018cd..4636f44795 100644 --- a/java/common/src/main/java/org/apache/qpid/util/ConcurrentLinkedMessageQueueAtomicSize.java +++ b/java/common/src/main/java/org/apache/qpid/util/ConcurrentLinkedMessageQueueAtomicSize.java @@ -181,8 +181,37 @@ public class ConcurrentLinkedMessageQueueAtomicSize<E> extends ConcurrentLinkedQ @Override public Iterator<E> iterator() { - throw new RuntimeException("Not Implemented"); + final Iterator<E> mainMessageIterator = super.iterator(); + return new Iterator<E>() + { + final Iterator<E> _headIterator = _messageHead.iterator(); + final Iterator<E> _mainIterator = mainMessageIterator; + + Iterator<E> last; + + public boolean hasNext() + { + return _headIterator.hasNext() || _mainIterator.hasNext(); + } + public E next() + { + if (_headIterator.hasNext()) + { + last = _headIterator; + return _headIterator.next(); + } + else + { + last = _mainIterator; + return _mainIterator.next(); + } + } + public void remove() + { + last.remove(); + } + }; } @Override diff --git a/java/systests/src/main/java/org/apache/qpid/test/VMTestCase.java b/java/systests/src/main/java/org/apache/qpid/test/VMTestCase.java new file mode 100644 index 0000000000..bbac06382d --- /dev/null +++ b/java/systests/src/main/java/org/apache/qpid/test/VMTestCase.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.test; + +import junit.extensions.TestSetup; +import junit.framework.Test; +import junit.framework.TestCase; + +import org.apache.qpid.client.transport.TransportConnection; +import org.apache.qpid.jndi.PropertiesFileInitialContextFactory; + +import javax.naming.Context; +import javax.naming.spi.InitialContextFactory; +import javax.jms.Queue; +import javax.jms.ConnectionFactory; +import javax.jms.Session; +import javax.jms.Connection; +import javax.jms.MessageProducer; +import java.util.Hashtable; +import java.util.List; +import java.util.LinkedList; +import java.util.Map; +import java.util.HashMap; + +public class VMTestCase extends TestCase +{ + protected long RECEIVE_TIMEOUT = 1000l; // 1 sec + protected long CLOSE_TIMEOUT = 10000l; // 10 secs + + protected Context _context; + protected String _clientID; + protected String _virtualhost; + protected String _brokerlist; + + protected final Map<String, String> _connections = new HashMap<String, String>(); + protected final Map<String, String> _queues = new HashMap<String, String>(); + protected final Map<String, String> _topics = new HashMap<String, String>(); + + protected void setUp() throws Exception + { + super.setUp(); + try + { + TransportConnection.createVMBroker(1); + } + catch (Exception e) + { + fail("Unable to create broker: " + e); + } + + InitialContextFactory factory = new PropertiesFileInitialContextFactory(); + + Hashtable<String, String> env = new Hashtable<String, String>(); + + if (_clientID == null) + { + _clientID = this.getClass().getName(); + } + + if (_virtualhost == null) + { + _virtualhost = "/test"; + } + + if (_brokerlist == null) + { + _brokerlist = "vm://:1"; + } + + env.put("connectionfactory.connection", "amqp://client:client@" + + _clientID + _virtualhost + "?brokerlist='" + _brokerlist + "'"); + + for (Map.Entry<String, String> c : _connections.entrySet()) + { + env.put("connectionfactory." + c.getKey(), c.getValue()); + } + + env.put("queue.queue", "queue"); + + for (Map.Entry<String, String> q : _queues.entrySet()) + { + env.put("queue." + q.getKey(), q.getValue()); + } + + env.put("topic.topic", "topic"); + + for (Map.Entry<String, String> t : _topics.entrySet()) + { + env.put("topic." + t.getKey(), t.getValue()); + } + + _context = factory.getInitialContext(env); + } + + protected void tearDown() throws Exception + { + TransportConnection.killVMBroker(1); + super.tearDown(); + } +} diff --git a/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserTest.java b/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserTest.java new file mode 100644 index 0000000000..ac65eec979 --- /dev/null +++ b/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserTest.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.test.client; + +import org.apache.qpid.test.VMTestCase; +import org.apache.log4j.Logger; + +import javax.jms.Queue; +import javax.jms.ConnectionFactory; +import javax.jms.Session; +import javax.jms.Connection; +import javax.jms.MessageProducer; +import javax.jms.MessageConsumer; +import javax.jms.QueueBrowser; +import javax.jms.TextMessage; +import javax.jms.JMSException; +import javax.jms.QueueReceiver; +import javax.jms.Message; +import java.util.Enumeration; + +public class QueueBrowserTest extends VMTestCase +{ + private static final Logger _logger = Logger.getLogger(QueueBrowserTest.class); + + private static final int MSG_COUNT = 10; + + private Connection _clientConnection; + private Session _clientSession; + private Queue _queue; + + public void setUp() throws Exception + { + + super.setUp(); + + _queue = (Queue) _context.lookup("queue"); + + //Create Client + _clientConnection = ((ConnectionFactory) _context.lookup("connection")).createConnection(); + + _clientConnection.start(); + + _clientSession = _clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + //Ensure _queue is created + _clientSession.createConsumer(_queue).close(); + + //Create Producer put some messages on the queue + Connection producerConnection = ((ConnectionFactory) _context.lookup("connection")).createConnection(); + + producerConnection.start(); + + Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + MessageProducer producer = producerSession.createProducer(_queue); + + for (int msg = 0; msg < MSG_COUNT; msg++) + { + producer.send(producerSession.createTextMessage("Message " + msg)); + } + + producerConnection.close(); + + } + + /* + * Test Messages Remain on Queue + * Create a queu and send messages to it. Browse them and then receive them all to verify they were still there + * + */ + + public void queueBrowserMsgsRemainOnQueueTest() throws JMSException + { + + // create QueueBrowser + _logger.info("Creating Queue Browser"); + + QueueBrowser queueBrowser = _clientSession.createBrowser(_queue); + + // check for messages + if (_logger.isDebugEnabled()) + { + _logger.debug("Checking for " + MSG_COUNT + " messages with QueueBrowser"); + } + + int msgCount = 0; + Enumeration msgs = queueBrowser.getEnumeration(); + + while (msgs.hasMoreElements()) + { + msgs.nextElement(); + msgCount++; + } + + if (_logger.isDebugEnabled()) + { + _logger.debug("Found " + msgCount + " messages total in browser"); + } + + // check to see if all messages found +// assertEquals("browser did not find all messages", MSG_COUNT, msgCount); + if (msgCount != MSG_COUNT) + { + _logger.warn(msgCount + "/" + MSG_COUNT + " messages received."); + } + + //Close browser + queueBrowser.close(); + + // VERIFY + + // continue and try to receive all messages + MessageConsumer consumer = _clientSession.createConsumer(_queue); + + _logger.info("Verify messages are still on the queue"); + + Message tempMsg; + + for (msgCount = 0; msgCount < MSG_COUNT; msgCount++) + { + tempMsg = (TextMessage) consumer.receive(RECEIVE_TIMEOUT); + if (tempMsg == null) + { + fail("Message " + msgCount + " not retrieved from queue"); + } + } + + _logger.info("All messages recevied from queue"); + } + + +} |
