summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
Diffstat (limited to 'java')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java281
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java15
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java20
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/BasicAckMethodHandler.java5
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java74
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java4
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java49
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java40
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java173
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java121
-rw-r--r--java/client/pom.xml7
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession.java231
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java84
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/BasicCancelOkMethodHandler.java38
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java18
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/state/StateWaiter.java1
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java3
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java6
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseTest.java10
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java603
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/message/StreamMessageTest.java3
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java106
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java122
-rw-r--r--java/cluster/src/test/java/org/apache/qpid/server/cluster/SimpleClusterTest.java2
-rw-r--r--java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java2
-rw-r--r--java/common/src/main/java/org/apache/qpid/util/ConcurrentLinkedMessageQueueAtomicSize.java26
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java3
30 files changed, 1599 insertions, 454 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index be2cee79ee..5dd6619cff 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -99,7 +99,7 @@ public class AMQChannel
private final MessageRouter _exchanges;
- private TransactionalContext _txnContext;
+ private TransactionalContext _txnContext, _nonTransactedContext;
/**
* A context used by the message store enabling it to track context for a given channel even across thread
@@ -113,9 +113,9 @@ public class AMQChannel
private Set<Long> _browsedAcks = new HashSet<Long>();
+ //Why do we need this reference ? - ritchiem
private final AMQProtocolSession _session;
-
public AMQChannel(AMQProtocolSession session, int channelId, MessageStore messageStore, MessageRouter exchanges)
throws AMQException
{
@@ -210,9 +210,9 @@ public class AMQChannel
}
else
{
- if (_log.isDebugEnabled())
+ if (_log.isTraceEnabled())
{
- _log.debug("Content header received on channel " + _channelId);
+ _log.trace(debugIdentity() + "Content header received on channel " + _channelId);
}
_currentMessage.setContentHeaderBody(contentHeaderBody);
routeCurrentMessage();
@@ -234,9 +234,9 @@ public class AMQChannel
throw new AMQException("Received content body without previously receiving a JmsPublishBody");
}
- if (_log.isDebugEnabled())
+ if (_log.isTraceEnabled())
{
- _log.debug("Content body received on channel " + _channelId);
+ _log.trace(debugIdentity() + "Content body received on channel " + _channelId);
}
try
{
@@ -289,8 +289,10 @@ public class AMQChannel
* @param tag the tag chosen by the client (if null, server will generate one)
* @param queue the queue to subscribe to
* @param session the protocol session of the subscriber
- * @param noLocal
- * @param exclusive
+ * @param noLocal Flag stopping own messages being receivied.
+ * @param exclusive Flag requesting exclusive access to the queue
+ * @param acks Are acks enabled for this subscriber
+ * @param filters Filters to apply to this subscriber
*
* @return the consumer tag. This is returned to the subscriber and used in subsequent unsubscribe requests
*
@@ -327,6 +329,8 @@ public class AMQChannel
/**
* Called from the protocol session to close this channel and clean up. T
*
+ * @param session The session to close
+ *
* @throws AMQException if there is an error during closure
*/
public void close(AMQProtocolSession session) throws AMQException
@@ -352,10 +356,23 @@ public class AMQChannel
* @param message the message that was delivered
* @param deliveryTag the delivery tag used when delivering the message (see protocol spec for description of the
* delivery tag)
+ * @param consumerTag The tag for the consumer that is to acknowledge this message.
* @param queue the queue from which the message was delivered
*/
public void addUnacknowledgedMessage(AMQMessage message, long deliveryTag, AMQShortString consumerTag, AMQQueue queue)
{
+ if (_log.isDebugEnabled())
+ {
+ if (queue == null)
+ {
+ _log.debug("Adding unacked message with a null queue:" + message.debugIdentity());
+ }
+ else
+ {
+ _log.debug(debugIdentity() + " Adding unacked message(" + deliveryTag + ") with a queue(" + queue + "):" + message.debugIdentity());
+ }
+ }
+
synchronized (_unacknowledgedMessageMap.getLock())
{
_unacknowledgedMessageMap.add(deliveryTag, new UnacknowledgedMessage(queue, message, consumerTag, deliveryTag));
@@ -363,8 +380,15 @@ public class AMQChannel
}
}
+ private final String id = "(" + System.identityHashCode(this) + ")";
+
+ public String debugIdentity()
+ {
+ return _channelId + id;
+ }
+
/**
- * Called to attempt re-enqueue all outstanding unacknowledged messages on the channel. May result in delivery to
+ * Called to attempt re-delivery all outstanding unacknowledged messages on the channel. May result in delivery to
* this same channel or to other subscribers.
*
* @throws org.apache.qpid.AMQException if the requeue fails
@@ -374,11 +398,22 @@ public class AMQChannel
// we must create a new map since all the messages will get a new delivery tag when they are redelivered
Collection<UnacknowledgedMessage> messagesToBeDelivered = _unacknowledgedMessageMap.cancelAllMessages();
- TransactionalContext nontransacted = null;
+ // Deliver these messages out of the transaction as their delivery was never
+ // part of the transaction only the receive.
+ TransactionalContext deliveryContext;
if (!(_txnContext instanceof NonTransactionalContext))
{
- nontransacted = new NonTransactionalContext(_messageStore, _storeContext, this,
- _returnMessages, _browsedAcks);
+ if (_nonTransactedContext == null)
+ {
+ _nonTransactedContext = new NonTransactionalContext(_messageStore, _storeContext, this,
+ _returnMessages, _browsedAcks);
+ }
+
+ deliveryContext = _nonTransactedContext;
+ }
+ else
+ {
+ deliveryContext = _txnContext;
}
@@ -386,72 +421,130 @@ public class AMQChannel
{
if (unacked.queue != null)
{
- // Deliver these messages out of the transaction as their delivery was never
- // part of the transaction only the receive.
- if (!(_txnContext instanceof NonTransactionalContext))
- {
- nontransacted.deliver(unacked.message, unacked.queue, false);
- }
- else
- {
- _txnContext.deliver(unacked.message, unacked.queue, false);
- }
+ unacked.message.setRedelivered(true);
+
+ // Deliver Message
+ deliveryContext.deliver(unacked.message, unacked.queue, false);
+
+ // Should we allow access To the DM to directy deliver the message?
+ // As we don't need to check for Consumers or worry about incrementing the message count?
+// unacked.queue.getDeliveryManager().deliver(_storeContext, unacked.queue.getName(), unacked.message, false);
}
}
}
+ /**
+ * Requeue a single message
+ *
+ * @param deliveryTag The message to requeue
+ *
+ * @throws AMQException If something goes wrong.
+ */
public void requeue(long deliveryTag) throws AMQException
{
UnacknowledgedMessage unacked = _unacknowledgedMessageMap.remove(deliveryTag);
if (unacked != null)
{
- TransactionalContext nontransacted = null;
+
+ // Ensure message is released for redelivery
+ unacked.message.release();
+
+ // Mark message redelivered
+ unacked.message.setRedelivered(true);
+
+ // Deliver these messages out of the transaction as their delivery was never
+ // part of the transaction only the receive.
+ TransactionalContext deliveryContext;
if (!(_txnContext instanceof NonTransactionalContext))
{
- nontransacted = new NonTransactionalContext(_messageStore, _storeContext, this,
- _returnMessages, _browsedAcks);
+ if (_nonTransactedContext == null)
+ {
+ _nonTransactedContext = new NonTransactionalContext(_messageStore, _storeContext, this,
+ _returnMessages, _browsedAcks);
+ }
+
+ deliveryContext = _nonTransactedContext;
+ }
+ else
+ {
+ deliveryContext = _txnContext;
}
- if (!(_txnContext instanceof NonTransactionalContext))
+
+ if (unacked.queue != null)
{
- nontransacted.deliver(unacked.message, unacked.queue, false);
+ //Redeliver the messages to the front of the queue
+ deliveryContext.deliver(unacked.message, unacked.queue, true);
+
+ unacked.message.decrementReference(_storeContext);
}
else
{
- _txnContext.deliver(unacked.message, unacked.queue, false);
+ _log.warn(System.identityHashCode(this) + " Requested requeue of message(" + unacked.message.debugIdentity() + "):" + deliveryTag +
+ " but no queue defined and no DeadLetter queue so DROPPING message.");
+// _log.error("Requested requeue of message:" + deliveryTag +
+// " but no queue defined using DeadLetter queue:" + getDeadLetterQueue());
+//
+// deliveryContext.deliver(unacked.message, getDeadLetterQueue(), false);
+//
+// unacked.message.decrementReference(_storeContext);
}
- unacked.message.decrementReference(_storeContext);
}
else
{
- _log.error("Requested requeue of message:" + deliveryTag + " but no such delivery tag exists");
+ _log.warn("Requested requeue of message:" + deliveryTag + " but no such delivery tag exists." + _unacknowledgedMessageMap.size());
+
+ if (_log.isDebugEnabled())
+ {
+ _unacknowledgedMessageMap.visit(new UnacknowledgedMessageMap.Visitor()
+ {
+ int count = 0;
+
+ public boolean callback(UnacknowledgedMessage message) throws AMQException
+ {
+ _log.debug((count++) + ": (" + message.message.debugIdentity() + ")" +
+ "[" + message.deliveryTag + "]");
+ return false; // Continue
+ }
+
+ public void visitComplete()
+ {
+
+ }
+ });
+ }
}
}
- /** Called to resend all outstanding unacknowledged messages to this same channel.
- * @param session the session
- * @param requeue if true then requeue, else resend
- * @throws org.apache.qpid.AMQException */
- public void resend(final AMQProtocolSession session, final boolean requeue) throws AMQException
+ /**
+ * Called to resend all outstanding unacknowledged messages to this same channel.
+ *
+ * @param requeue Are the messages to be requeued or dropped.
+ *
+ * @throws AMQException When something goes wrong.
+ */
+ public void resend(final boolean requeue) throws AMQException
{
final List<UnacknowledgedMessage> msgToRequeue = new LinkedList<UnacknowledgedMessage>();
final List<UnacknowledgedMessage> msgToResend = new LinkedList<UnacknowledgedMessage>();
if (_log.isInfoEnabled())
{
- _log.info("unacked map contains " + _unacknowledgedMessageMap.size());
+ _log.info("unacked map Size:" + _unacknowledgedMessageMap.size());
}
+ // Process the Unacked-Map.
+ // Marking messages who still have a consumer for to be resent
+ // and those that don't to be requeued.
_unacknowledgedMessageMap.visit(new UnacknowledgedMessageMap.Visitor()
{
public boolean callback(UnacknowledgedMessage message) throws AMQException
{
- long deliveryTag = message.deliveryTag;
AMQShortString consumerTag = message.consumerTag;
AMQMessage msg = message.message;
msg.setRedelivered(true);
@@ -503,14 +596,21 @@ public class AMQChannel
{
if (!msgToResend.isEmpty())
{
- _log.info("Preparing (" + msgToResend.size() + ") message to resend to.");
+ _log.info("Preparing (" + msgToResend.size() + ") message to resend.");
+ }
+ else
+ {
+ _log.info("No message to resend.");
}
}
for (UnacknowledgedMessage message : msgToResend)
{
AMQMessage msg = message.message;
- // Our Java Client will always suspend the channel when resending!!
+ // Our Java Client will always suspend the channel when resending!
+ // If the client has requested the messages be resent then it is
+ // their responsibility to ensure that thay are capable of receiving them
+ // i.e. The channel hasn't been server side suspended.
// if (isSuspended())
// {
// _log.info("Channel is suspended so requeuing");
@@ -518,50 +618,58 @@ public class AMQChannel
// msgToRequeue.add(message);
// }
// else
- {
- //release to allow it to be delivered
- msg.release();
+// {
+ //release to allow it to be delivered
+ msg.release();
- // Without any details from the client about what has been processed we have to mark
- // all messages in the unacked map as redelivered.
- msg.setRedelivered(true);
+ // Without any details from the client about what has been processed we have to mark
+ // all messages in the unacked map as redelivered.
+ msg.setRedelivered(true);
- Subscription sub = msg.getDeliveredSubscription();
+ Subscription sub = msg.getDeliveredSubscription();
- if (sub != null)
+ if (sub != null)
+ {
+ // Get the lock so we can tell if the sub scription has closed.
+ // will stop delivery to this subscription until the lock is released.
+ // note: this approach would allow the use of a single queue if the
+ // PreDeliveryQueue would allow head additions.
+ // In the Java Qpid client we are suspended whilst doing this so it is all rather Mute..
+ // needs guidance from AMQP WG Model SIG
+ synchronized (sub.getSendLock())
{
- synchronized (sub.getSendLock())
+ if (sub.isClosed())
{
- if (sub.isClosed())
+ if (_log.isDebugEnabled())
{
- _log.info("Subscription closed during resend so requeuing message");
- //move this message to requeue
- msgToRequeue.add(message);
+ _log.debug("Subscription(" + System.identityHashCode(sub) + ") closed during resend so requeuing message");
}
- else
+ //move this message to requeue
+ msgToRequeue.add(message);
+ }
+ else
+ {
+ if (_log.isDebugEnabled())
{
- if (_log.isDebugEnabled())
- {
- _log.debug("Requeuing (" + System.identityHashCode(msg) + ") for resend");
- }
- // Will throw an exception if the sub is closed
- sub.addToResendQueue(msg);
- _unacknowledgedMessageMap.remove(message.deliveryTag);
- // Don't decrement as we are bypassing the normal deliver which increments
- // this is what there is a decrement on the Requeue as deliver will increment.
- // msg.decrementReference(_storeContext);
+ _log.debug("Requeuing " + msg.debugIdentity() + " for resend via sub:" + System.identityHashCode(sub));
}
+ sub.addToResendQueue(msg);
+ _unacknowledgedMessageMap.remove(message.deliveryTag);
+ // Don't decrement as we are bypassing the normal deliver which increments
+ // this is why there is a decrement on the Requeue as deliver will increment.
+ // msg.decrementReference(_storeContext);
}
- }
- else
- {
- _log.info("DeliveredSubscription not recorded so just requeueing to prevent loss");
- //move this message to requeue
- msgToRequeue.add(message);
- }
+ } // sync(sub.getSendLock)
}
- }
+ else
+ {
+ _log.info("DeliveredSubscription not recorded so just requeueing to prevent loss");
+ //move this message to requeue
+ msgToRequeue.add(message);
+ }
+ } // for all messages
+// } else !isSuspend
if (_log.isInfoEnabled())
{
@@ -571,26 +679,31 @@ public class AMQChannel
}
}
- TransactionalContext nontransacted = null;
+ // Deliver these messages out of the transaction as their delivery was never
+ // part of the transaction only the receive.
+ TransactionalContext deliveryContext;
if (!(_txnContext instanceof NonTransactionalContext))
{
- nontransacted = new NonTransactionalContext(_messageStore, _storeContext, this,
- _returnMessages, _browsedAcks);
+ if (_nonTransactedContext == null)
+ {
+ _nonTransactedContext = new NonTransactionalContext(_messageStore, _storeContext, this,
+ _returnMessages, _browsedAcks);
+ }
+
+ deliveryContext = _nonTransactedContext;
+ }
+ else
+ {
+ deliveryContext = _txnContext;
}
// Process Messages to Requeue at the front of the queue
for (UnacknowledgedMessage message : msgToRequeue)
{
- // Deliver these messages out of the transaction as their delivery was never
- // part of the transaction only the receive.
- if (!(_txnContext instanceof NonTransactionalContext))
- {
- nontransacted.deliver(message.message, message.queue, true);
- }
- else
- {
- _txnContext.deliver(message.message, message.queue, true);
- }
+ message.message.release();
+ message.message.setRedelivered(true);
+
+ deliveryContext.deliver(message.message, message.queue, true);
_unacknowledgedMessageMap.remove(message.deliveryTag);
message.message.decrementReference(_storeContext);
diff --git a/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java b/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java
index 3f2348b71b..940b5b2bf1 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java
@@ -42,6 +42,21 @@ public class UnacknowledgedMessage
message.incrementReference();
}
+ public String toString()
+ {
+ StringBuilder sb = new StringBuilder();
+ sb.append("Q:");
+ sb.append(queue);
+ sb.append(" M:");
+ sb.append(message);
+ sb.append(" CT:");
+ sb.append(consumerTag);
+ sb.append(" DT:");
+ sb.append(deliveryTag);
+
+ return sb.toString();
+ }
+
public void discard(StoreContext storeContext) throws AMQException
{
if (queue != null)
diff --git a/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java b/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java
index 99cc60011a..30bbdea2ef 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java
@@ -196,25 +196,7 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap
}
}
}
-
- public void resendMessages(AMQProtocolSession protocolSession, int channelId) throws AMQException
- {
- synchronized (_lock)
- {
- for (Map.Entry<Long, UnacknowledgedMessage> entry : _map.entrySet())
- {
- long deliveryTag = entry.getKey();
- AMQShortString consumerTag = entry.getValue().consumerTag;
- AMQMessage msg = entry.getValue().message;
-
- if(consumerTag != null)
- {
- protocolSession.getProtocolOutputConverter().writeDeliver(msg, channelId, deliveryTag, consumerTag);
- }
- }
- }
- }
-
+
public UnacknowledgedMessage get(long key)
{
synchronized (_lock)
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicAckMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicAckMethodHandler.java
index f93b2b25e6..a6972475a6 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicAckMethodHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicAckMethodHandler.java
@@ -47,12 +47,13 @@ public class BasicAckMethodHandler implements StateAwareMethodListener<BasicAckB
public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<BasicAckBody> evt) throws AMQException
{
AMQProtocolSession protocolSession = stateManager.getProtocolSession();
+ BasicAckBody body = evt.getMethod();
if (_log.isDebugEnabled())
{
- _log.debug("Ack received on channel " + evt.getChannelId());
+ _log.debug("Ack(Tag:" + body.deliveryTag + ":Mult:" + body.multiple + ") received on channel " + evt.getChannelId());
}
- BasicAckBody body = evt.getMethod();
+
final AMQChannel channel = protocolSession.getChannel(evt.getChannelId());
if (channel == null)
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java
index bc11e4652c..a436c35473 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java
@@ -54,7 +54,7 @@ public class BasicRecoverMethodHandler implements StateAwareMethodListener<Basic
throw body.getChannelNotFoundException(evt.getChannelId());
}
- channel.resend(session, body.requeue);
+ channel.resend(body.requeue);
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java
index ed13092ded..4e77a5e8b9 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java
@@ -23,6 +23,7 @@ package org.apache.qpid.server.handler;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.BasicRejectBody;
import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.ack.UnacknowledgedMessage;
import org.apache.qpid.server.protocol.AMQProtocolSession;
@@ -49,20 +50,67 @@ public class BasicRejectMethodHandler implements StateAwareMethodListener<BasicR
{
AMQProtocolSession session = stateManager.getProtocolSession();
- _logger.info("FIXME: Rejecting:" + evt.getMethod().deliveryTag + ": Requeue:" + evt.getMethod().requeue);
-
int channelId = evt.getChannelId();
- UnacknowledgedMessage message = session.getChannel(channelId).getUnacknowledgedMessageMap().get(evt.getMethod().deliveryTag);
-
- _logger.info("Need to reject message:" + message);
-// if (evt.getMethod().requeue)
-// {
-// session.getChannel(channelId).requeue(evt.getMethod().deliveryTag);
-// }
-// else
-// {
-// // session.getChannel(channelId).resend(message);
-// }
+ if (_logger.isTraceEnabled())
+ {
+ _logger.trace("Rejecting:" + evt.getMethod().deliveryTag +
+ ": Requeue:" + evt.getMethod().requeue +
+// ": Resend:" + evt.getMethod().resend +
+ " on channel:" + channelId);
+ }
+
+ AMQChannel channel = session.getChannel(channelId);
+
+ if (channel == null)
+ {
+ throw evt.getMethod().getChannelNotFoundException(channelId);
+ }
+
+ if (_logger.isTraceEnabled())
+ {
+ _logger.trace("Rejecting:" + evt.getMethod().deliveryTag +
+ ": Requeue:" + evt.getMethod().requeue +
+// ": Resend:" + evt.getMethod().resend +
+ " on channel:" + channel.debugIdentity());
+ }
+
+ long deliveryTag = evt.getMethod().deliveryTag;
+
+ UnacknowledgedMessage message = channel.getUnacknowledgedMessageMap().get(deliveryTag);
+
+ if (message == null)
+ {
+ _logger.warn("Dropping reject request as message is null for tag:" + deliveryTag);
+// throw evt.getMethod().getChannelException(AMQConstant.NOT_FOUND, "Delivery Tag(" + deliveryTag + ")not known");
+ }
+ else
+ {
+
+ if (_logger.isTraceEnabled())
+ {
+ _logger.trace("Rejecting: DT:" + deliveryTag + "-" + message.message.debugIdentity() +
+ ": Requeue:" + evt.getMethod().requeue +
+// ": Resend:" + evt.getMethod().resend +
+ " on channel:" + channel.debugIdentity());
+ }
+
+ // If we haven't requested message to be resent to this consumer then reject it from ever getting it.
+// if (!evt.getMethod().resend)
+ {
+ message.message.reject(message.message.getDeliveredSubscription());
+ }
+
+ if (evt.getMethod().requeue)
+ {
+ channel.requeue(deliveryTag);
+ }
+ else
+ {
+ _logger.warn("Dropping message as requeue not required and there is no dead letter queue");
+// message.queue = channel.getDefaultDeadLetterQueue();
+// channel.requeue(deliveryTag);
+ }
+ }
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java
index 9a8fce7129..777784ca30 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java
@@ -63,6 +63,8 @@ public class ChannelCloseHandler implements StateAwareMethodListener<ChannelClos
}
session.closeChannel(channelId);
+ // Client requested closure so we don't wait for ok we send it
+ stateManager.getProtocolSession().closeChannelOk(channelId);
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java
index a10f44f906..f747f7a840 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java
@@ -63,7 +63,7 @@ public class TxRollbackHandler implements StateAwareMethodListener<TxRollbackBod
//Now resend all the unacknowledged messages back to the original subscribers.
//(Must be done after the TxnRollback-ok response).
// Why, are we not allowed to send messages back to client before the ok method?
- channel.resend(session, false);
+ channel.resend(false);
}
catch (AMQException e)
{
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
index d71f6e3046..133f4809b4 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
@@ -523,8 +523,8 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
{
try
{
- markChannelawaitingCloseOk(channelId);
channel.close(this);
+ markChannelawaitingCloseOk(channelId);
}
finally
{
@@ -546,7 +546,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
/**
* In our current implementation this is used by the clustering code.
*
- * @param channelId
+ * @param channelId The channel to remove
*/
public void removeChannel(int channelId)
{
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
index dedea68d18..6d375c89fe 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
@@ -85,11 +85,18 @@ public class AMQMessage
private Subscription _takenBySubcription;
+ private Set<Subscription> _rejectedBy = null;
+
public boolean isTaken()
{
return _taken.get();
}
+ public String debugIdentity()
+ {
+ return "(HC:" + System.identityHashCode(this) + " ID:" + _messageId + ")";
+ }
+
/**
* Used to iterate through all the body frames associated with this message. Will not keep all the data in memory
* therefore is memory-efficient.
@@ -199,7 +206,7 @@ public class AMQMessage
_taken = new AtomicBoolean(false);
if (_log.isDebugEnabled())
{
- _log.debug("Message created with id " + messageId);
+ _log.debug("Message(" + System.identityHashCode(this) + ") created with id " + messageId);
}
}
@@ -452,7 +459,9 @@ public class AMQMessage
public void release()
{
+ _log.trace("Releasing Message:" + debugIdentity());
_taken.set(false);
+ _takenBySubcription = null;
}
public boolean checkToken(Object token)
@@ -511,7 +520,7 @@ public class AMQMessage
* @throws NoConsumersException if the message is marked for immediate delivery but has not been marked as delivered
* to a consumer
*/
- public void checkDeliveredToConsumer() throws NoConsumersException, AMQException
+ public void checkDeliveredToConsumer() throws NoConsumersException
{
if (_immediate && !_deliveredToConsumer)
@@ -580,7 +589,8 @@ public class AMQMessage
for (AMQQueue q : destinationQueues)
{
- _txnContext.deliver(this, q, true);
+ //normal deliver so add this message at the end.
+ _txnContext.deliver(this, q, false);
}
}
finally
@@ -801,7 +811,7 @@ public class AMQMessage
public String toString()
{
- return "Message: " + _messageId + "; ref count: " + _referenceCount + "; taken: " +
+ return "Message[" + debugIdentity() + "]: " + _messageId + "; ref count: " + _referenceCount + "; taken: " +
_taken + " by:" + _takenBySubcription;
}
@@ -809,4 +819,35 @@ public class AMQMessage
{
return _takenBySubcription;
}
+
+ public void reject(Subscription subscription)
+ {
+ if (subscription != null)
+ {
+ if (_rejectedBy == null)
+ {
+ _rejectedBy = new HashSet<Subscription>();
+ }
+
+ _rejectedBy.add(subscription);
+ }
+ else
+ {
+ _log.warn("Requesting rejection by null subscriber:" + debugIdentity());
+ }
+ }
+
+ public boolean isRejectedBy(Subscription subscription)
+ {
+ boolean rejected = _rejectedBy != null;
+
+ if (rejected) // We have subscriptions that rejected this message
+ {
+ return _rejectedBy.contains(subscription);
+ }
+ else // This messasge hasn't been rejected yet.
+ {
+ return rejected;
+ }
+ }
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
index 5bbe1671a7..7c2fe73386 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
@@ -50,6 +50,7 @@ import org.apache.qpid.server.virtualhost.VirtualHost;
*/
public class AMQQueue implements Managable, Comparable
{
+
public static final class ExistingExclusiveSubscription extends AMQException
{
@@ -446,7 +447,11 @@ public class AMQQueue implements Managable, Comparable
setExclusive(true);
}
- debug("Registering protocol session {0} with channel {1} and consumer tag {2} with {3}", ps, channel, consumerTag, this);
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug(MessageFormat.format("Registering protocol session {0} with channel {1} and " +
+ "consumer tag {2} with {3}", ps, channel, consumerTag, this));
+ }
Subscription subscription = _subscriptionFactory.createSubscription(channel, ps, consumerTag, acks,
filters, noLocal, this);
@@ -486,8 +491,11 @@ public class AMQQueue implements Managable, Comparable
public void unregisterProtocolSession(AMQProtocolSession ps, int channel, AMQShortString consumerTag) throws AMQException
{
- debug("Unregistering protocol session {0} with channel {1} and consumer tag {2} from {3}", ps, channel, consumerTag,
- this);
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug(MessageFormat.format("Unregistering protocol session {0} with channel {1} and consumer tag {2} from {3}", ps, channel, consumerTag,
+ this));
+ }
Subscription removedSubscription;
if ((removedSubscription = _subscribers.removeSubscriber(_subscriptionFactory.createSubscription(channel,
@@ -506,6 +514,10 @@ public class AMQQueue implements Managable, Comparable
// if we are eligible for auto deletion, unregister from the queue registry
if (_autoDelete && _subscribers.isEmpty())
{
+ if (_logger.isInfoEnabled())
+ {
+ _logger.warn("Auto-deleteing queue:" + this);
+ }
autodelete();
// we need to manually fire the event to the removed subscription (which was the last one left for this
// queue. This is because the delete method uses the subscription set which has just been cleared
@@ -561,14 +573,18 @@ public class AMQQueue implements Managable, Comparable
protected void autodelete() throws AMQException
{
- debug("autodeleting {0}", this);
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug(MessageFormat.format("autodeleting {0}", this));
+ }
delete();
}
- public void processGet(StoreContext storeContext, AMQMessage msg) throws AMQException
+ public void processGet(StoreContext storeContext, AMQMessage msg, boolean deliverFirst) throws AMQException
{
//fixme not sure what this is doing. should we be passing deliverFirst through here?
- _deliveryMgr.deliver(storeContext, getName(), msg, false);
+ // This code is not used so when it is perhaps it should
+ _deliveryMgr.deliver(storeContext, getName(), msg, deliverFirst);
try
{
msg.checkDeliveredToConsumer();
@@ -582,6 +598,10 @@ public class AMQQueue implements Managable, Comparable
}
}
+// public DeliveryManager getDeliveryManager()
+// {
+// return _deliveryMgr;
+// }
public void process(StoreContext storeContext, AMQMessage msg, boolean deliverFirst) throws AMQException
{
@@ -673,14 +693,6 @@ public class AMQQueue implements Managable, Comparable
return "Queue(" + _name + ")@" + System.identityHashCode(this);
}
- private void debug(String msg, Object... args)
- {
- if (_logger.isDebugEnabled())
- {
- _logger.debug(MessageFormat.format(msg, args));
- }
- }
-
public boolean performGet(AMQProtocolSession session, AMQChannel channel, boolean acks) throws AMQException
{
return _deliveryMgr.performGet(session, channel, acks);
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
index e70926736d..601effcec7 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
@@ -45,7 +45,6 @@ import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.util.MessageQueue;
import org.apache.qpid.util.ConcurrentLinkedMessageQueueAtomicSize;
-import org.apache.qpid.util.ConcurrentLinkedQueueAtomicSize;
/** Manages delivery of messages on behalf of a queue */
@@ -86,6 +85,8 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
private AtomicLong _totalMessageSize = new AtomicLong();
private AtomicInteger _extraMessages = new AtomicInteger();
private Set<Subscription> _hasContent = Collections.synchronizedSet(new HashSet<Subscription>());
+ private final Object _queueHeadLock = new Object();
+ private String _processingThreadName = "";
ConcurrentSelectorDeliveryManager(SubscriptionManager subscriptions, AMQQueue queue)
{
@@ -118,7 +119,10 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
if (deliverFirst)
{
- _messages.pushHead(msg);
+ synchronized (_queueHeadLock)
+ {
+ _messages.pushHead(msg);
+ }
}
else
{
@@ -367,16 +371,19 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
long count = 0;
_lock.lock();
- AMQMessage msg = getNextMessage();
- while (msg != null)
+ synchronized (_queueHeadLock)
{
- //mark this message as taken and get it removed
- msg.taken(null);
- _queue.dequeue(storeContext, msg);
- msg = getNextMessage();
- count++;
- }
+ AMQMessage msg = getNextMessage();
+ while (msg != null)
+ {
+ //and remove it
+ _messages.poll();
+ _queue.dequeue(storeContext, msg);
+ msg = getNextMessage();
+ count++;
+ }
+ }
_lock.unlock();
return count;
}
@@ -390,12 +397,20 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
{
AMQMessage message = messages.peek();
-
- while (message != null && ((sub == null || sub.isBrowser()) || message.taken(sub)))
+ while (message != null && ((sub != null && sub.isBrowser()) || message.taken(sub)))
{
//remove the already taken message
- messages.poll();
+ AMQMessage removed = messages.poll();
+
+ assert removed == message;
+
_totalMessageSize.addAndGet(-message.getSize());
+
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("Removed taken message:" + message.debugIdentity());
+ }
+
// try the next message
message = messages.peek();
}
@@ -409,7 +424,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
if (_log.isTraceEnabled())
{
- _log.trace("Async sendNextMessage for sub (" + System.identityHashCode(sub) +
+ _log.trace(debugIdentity() + "Async sendNextMessage for sub (" + System.identityHashCode(sub) +
") from queue (" + System.identityHashCode(messageQueue) +
") AMQQueue (" + System.identityHashCode(queue) + ")");
}
@@ -417,46 +432,63 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
if (messageQueue == null)
{
// There is no queue with messages currently. This is ok... just means the queue has no msgs matching selector
- if (_log.isDebugEnabled())
+ if (_log.isInfoEnabled())
{
- _log.debug(sub + ": asked to send messages but has none on given queue:" + queue);
+ _log.info(debugIdentity() + sub + ": asked to send messages but has none on given queue:" + queue);
}
return;
}
AMQMessage message = null;
+ AMQMessage removed = null;
try
{
- message = getNextMessage(messageQueue, sub);
-
- // message will be null if we have no messages in the messageQueue.
- if (message == null)
+ synchronized (_queueHeadLock)
{
- if (_log.isTraceEnabled())
+ message = getNextMessage(messageQueue, sub);
+
+ // message will be null if we have no messages in the messageQueue.
+ if (message == null)
{
- _log.trace("No messages for Subscriber(" + System.identityHashCode(sub) + ") from queue; (" + System.identityHashCode(messageQueue) + ")");
+ if (_log.isTraceEnabled())
+ {
+ _log.trace(debugIdentity() + "No messages for Subscriber(" + System.identityHashCode(sub) + ") from queue; (" + System.identityHashCode(messageQueue) + ")");
+ }
+ return;
+ }
+ if (_log.isDebugEnabled())
+ {
+ _log.debug(debugIdentity() + "Async Delivery Message " + message.getMessageId() + "(" + System.identityHashCode(message) +
+ ") by :" + System.identityHashCode(this) +
+ ") to :" + System.identityHashCode(sub));
}
- return;
+
+ sub.send(message, _queue);
+
+ //remove sent message from our queue.
+ removed = messageQueue.poll();
+ //If we don't remove the message from _messages
+ // Otherwise the Async send will never end
}
+
+ if (removed != message)
+ {
+ _log.error("Just send message:" + message.debugIdentity() + " BUT removed this from queue:" + removed);
+ }
+
if (_log.isDebugEnabled())
{
- _log.debug("Async Delivery Message (" + System.identityHashCode(message) +
+ _log.debug(debugIdentity() + "Async Delivered Message r:" + removed.debugIdentity() + "d:" + message.debugIdentity() +
") by :" + System.identityHashCode(this) +
") to :" + System.identityHashCode(sub));
}
- sub.send(message, _queue);
-
- //remove sent message from our queue.
- messageQueue.poll();
- //If we don't remove the message from _messages
- // Otherwise the Async send will never end
if (messageQueue == sub.getResendQueue())
{
if (_log.isTraceEnabled())
{
- _log.trace("All messages sent from resendQueue for " + sub);
+ _log.trace(debugIdentity() + "All messages sent from resendQueue for " + sub);
}
if (messageQueue.isEmpty())
{
@@ -469,7 +501,10 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
}
else if (messageQueue == sub.getPreDeliveryQueue())
{
- _log.info("We could do clean up of the main _message queue here");
+ if (_log.isInfoEnabled())
+ {
+ _log.info(debugIdentity() + "We could do clean up of the main _message queue here");
+ }
}
_totalMessageSize.addAndGet(-message.getSize());
@@ -477,7 +512,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
catch (AMQException e)
{
message.release();
- _log.error("Unable to deliver message as dequeue failed: " + e, e);
+ _log.error(debugIdentity() + "Unable to deliver message as dequeue failed: " + e, e);
}
}
@@ -516,6 +551,12 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
*/
private void processQueue()
{
+ //record thread name
+ if (_log.isDebugEnabled())
+ {
+ _processingThreadName = Thread.currentThread().getName();
+ }
+
// Continue to process delivery while we haveSubscribers and messages
boolean hasSubscribers = _subscriptions.hasActiveSubscribers();
@@ -561,9 +602,10 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
{
if (_log.isDebugEnabled())
{
- _log.debug(id() + "deliver :first(" + deliverFirst + ") :" + msg);
+ _log.debug(debugIdentity() + "deliver :first(" + deliverFirst + ") :" + msg);
}
- msg.release();
+ // This shouldn't be done here.
+// msg.release();
//Check if we have someone to deliver the message to.
_lock.lock();
@@ -575,7 +617,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
{
if (_log.isDebugEnabled())
{
- _log.debug(id() + "Testing Message(" + msg + ") for Queued Delivery:" + currentStatus());
+ _log.debug(debugIdentity() + "Testing Message(" + msg + ") for Queued Delivery:" + currentStatus());
}
if (!msg.getMessagePublishInfo().isImmediate())
{
@@ -587,7 +629,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
//Pre Deliver to all subscriptions
if (_log.isDebugEnabled())
{
- _log.debug(id() + "We have " + _subscriptions.getSubscriptions().size() +
+ _log.debug(debugIdentity() + "We have " + _subscriptions.getSubscriptions().size() +
" subscribers to give the message to:" + currentStatus());
}
for (Subscription sub : _subscriptions.getSubscriptions())
@@ -598,7 +640,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
{
if (_log.isDebugEnabled())
{
- _log.debug(id() + "Stopping PreDelivery as message(" + System.identityHashCode(msg) +
+ _log.debug(debugIdentity() + "Stopping PreDelivery as message(" + System.identityHashCode(msg) +
") is already delivered.");
}
continue;
@@ -609,7 +651,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
{
if (_log.isDebugEnabled())
{
- _log.debug(id() + "Queuing message(" + System.identityHashCode(msg) +
+ _log.debug(debugIdentity() + "Queuing message(" + System.identityHashCode(msg) +
") for PreDelivery for subscriber(" + System.identityHashCode(sub) + ")");
}
sub.enqueueForPreDelivery(msg, deliverFirst);
@@ -625,9 +667,9 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
{
if (!s.isSuspended())
{
- if (_log.isDebugEnabled())
+ if (_log.isTraceEnabled())
{
- _log.debug(id() + "Delivering Message:" + System.identityHashCode(msg) + " to(" +
+ _log.trace(debugIdentity() + "Delivering Message:" + msg.debugIdentity() + " to(" +
System.identityHashCode(s) + ") :" + s);
}
msg.taken(s);
@@ -636,33 +678,35 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
}
else
{
- if (_log.isDebugEnabled())
+ if (_log.isInfoEnabled())
{
- _log.debug(id() + " Subscription(" + System.identityHashCode(s) + ") became suspended between nextSubscriber and send");
+ _log.info(debugIdentity() + " Subscription(" + System.identityHashCode(s) + ") became " +
+ "suspended between nextSubscriber and send for message:" + msg.debugIdentity());
}
}
+ }
- if (!msg.isTaken())
+ if (!msg.isTaken())
+ {
+ if (_log.isInfoEnabled())
{
- if (_log.isDebugEnabled())
- {
- _log.debug(id() + " Message(" + System.identityHashCode(msg) + ") has not been taken so recursing!:" +
- " Subscriber:" + System.identityHashCode(s));
- }
-
- deliver(context, name, msg, deliverFirst);
+ _log.info(debugIdentity() + " Message(" + msg.debugIdentity() + ") has not been taken so recursing!:" +
+ " Subscriber:" + System.identityHashCode(s));
}
- else
+
+ deliver(context, name, msg, deliverFirst);
+ }
+ else
+ {
+ if (_log.isDebugEnabled())
{
- if (_log.isDebugEnabled())
- {
- _log.debug(id() + " Message(" + System.identityHashCode(msg) +
- ") has been taken so disregarding deliver request to Subscriber:" +
- System.identityHashCode(s));
- }
+ _log.debug(debugIdentity() + " Message(" + msg.debugIdentity() +
+ ") has been taken so disregarding deliver request to Subscriber:" +
+ System.identityHashCode(s));
}
}
}
+
}
finally
{
@@ -674,10 +718,9 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
}
}
- //fixme remove
private final String id = "(" + String.valueOf(System.identityHashCode(this)) + ")";
- private String id()
+ private String debugIdentity()
{
return id;
}
@@ -710,7 +753,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
{
if (_log.isDebugEnabled())
{
- _log.debug("Processing Async." + currentStatus());
+ _log.debug(debugIdentity() + "Processing Async." + currentStatus());
}
if (hasQueuedMessages() && _subscriptions.hasActiveSubscribers())
@@ -725,14 +768,12 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
private String currentStatus()
{
- return " Queued:" + (_messages.isEmpty() ? "Empty " : "Contains") +
- "(" + _messages.size() + ":" + ((ConcurrentLinkedQueue) _messages).size() + ") " +
+ return " Queued:" + (_messages.isEmpty() ? "Empty " : "Contains(M:H)") +
+ "(" + _messages.size() + ":" + ((ConcurrentLinkedMessageQueueAtomicSize) _messages).headSize() + ") " +
" Extra: " + (_hasContent.isEmpty() ? "Empty " : "Contains") +
"(" + _hasContent.size() + ":" + _extraMessages.get() + ") " +
" Active:" + _subscriptions.hasActiveSubscribers() +
- " Processing:" + _processing.get() +
- " Queued:" + (_messages.isEmpty() ? "Empty " : "Contains") +
- "(" + _messages.size() + ":" + ((ConcurrentLinkedQueue) _messages).size() + ") ";
+ " Processing:" + (_processing.get() ? " true : Processing Thread: " + _processingThreadName : " false");
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
index 0a2e73880c..20033daac7 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
@@ -46,6 +46,8 @@ import org.apache.qpid.util.ConcurrentLinkedMessageQueueAtomicSize;
*/
public class SubscriptionImpl implements Subscription
{
+
+ private static final Logger _suspensionlogger = Logger.getLogger("Suspension");
private static final Logger _logger = Logger.getLogger(SubscriptionImpl.class);
public final AMQChannel channel;
@@ -258,6 +260,12 @@ public class SubscriptionImpl implements Subscription
{
channel.addUnacknowledgedBrowsedMessage(msg, deliveryTag, consumerTag, queue);
}
+
+ if (_sendLock.get())
+ {
+ _logger.error("Sending " + msg + " when subscriber(" + this + ") is closed!");
+ }
+
protocolSession.getProtocolOutputConverter().writeDeliver(msg, channel.getChannelId(), deliveryTag, consumerTag);
}
}
@@ -265,56 +273,56 @@ public class SubscriptionImpl implements Subscription
private void sendToConsumer(StoreContext storeContext, AMQMessage msg, AMQQueue queue)
throws AMQException
{
- try
- {
- // if we do not need to wait for client acknowledgements
- // we can decrement the reference count immediately.
+ // if we do not need to wait for client acknowledgements
+ // we can decrement the reference count immediately.
- // By doing this _before_ the send we ensure that it
- // doesn't get sent if it can't be dequeued, preventing
- // duplicate delivery on recovery.
+ // By doing this _before_ the send we ensure that it
+ // doesn't get sent if it can't be dequeued, preventing
+ // duplicate delivery on recovery.
- // The send may of course still fail, in which case, as
- // the message is unacked, it will be lost.
- if (!_acks)
+ // The send may of course still fail, in which case, as
+ // the message is unacked, it will be lost.
+ if (!_acks)
+ {
+ if (_logger.isDebugEnabled())
{
- if (_logger.isDebugEnabled())
- {
- _logger.debug("No ack mode so dequeuing message immediately: " + msg.getMessageId());
- }
- queue.dequeue(storeContext, msg);
+ _logger.debug("No ack mode so dequeuing message immediately: " + msg.getMessageId());
}
- synchronized (channel)
- {
- long deliveryTag = channel.getNextDeliveryTag();
-
- if (_acks)
- {
- channel.addUnacknowledgedMessage(msg, deliveryTag, consumerTag, queue);
- msg.decrementReference(storeContext);
- }
+ queue.dequeue(storeContext, msg);
+ }
+ synchronized (channel)
+ {
+ long deliveryTag = channel.getNextDeliveryTag();
- protocolSession.getProtocolOutputConverter().writeDeliver(msg, channel.getChannelId(), deliveryTag, consumerTag);
+ if (_sendLock.get())
+ {
+ _logger.error("Sending " + msg + " when subscriber(" + this + ") is closed!");
+ }
+ if (_acks)
+ {
+ channel.addUnacknowledgedMessage(msg, deliveryTag, consumerTag, queue);
+ msg.decrementReference(storeContext);
}
- }
- finally
- {
+
+ protocolSession.getProtocolOutputConverter().writeDeliver(msg, channel.getChannelId(), deliveryTag, consumerTag);
+ //Only set delivered if it actually was writen successfully..
+ // using a try->finally would set it even if an error occured.
msg.setDeliveredToConsumer();
}
}
public boolean isSuspended()
{
- if (_logger.isTraceEnabled())
+ if (_suspensionlogger.isInfoEnabled())
{
if (channel.isSuspended())
{
- _logger.trace("Subscription(" + System.identityHashCode(this) + ") channel's is susupended");
+ _suspensionlogger.info("Subscription(" + debugIdentity() + ") channel's is susupended");
}
if (_sendLock.get())
{
- _logger.trace("Subscription(" + System.identityHashCode(this) + ") has sendLock set so closing.");
+ _suspensionlogger.info("Subscription(" + debugIdentity() + ") has sendLock set so closing.");
}
}
return channel.isSuspended() || _sendLock.get();
@@ -323,7 +331,7 @@ public class SubscriptionImpl implements Subscription
/**
* Callback indicating that a queue has been deleted.
*
- * @param queue
+ * @param queue The queue to delete
*/
public void queueDeleted(AMQQueue queue) throws AMQException
{
@@ -337,9 +345,18 @@ public class SubscriptionImpl implements Subscription
public boolean hasInterest(AMQMessage msg)
{
+ //check that the message hasn't been rejected
+ if (msg.isRejectedBy(this))
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Subscription:" + debugIdentity() + " rejected message:" + msg.debugIdentity());
+ }
+// return false;
+ }
+
if (_noLocal)
{
- boolean isLocal;
// We don't want local messages so check to see if message is one we sent
Object localInstance;
Object msgInstance;
@@ -350,12 +367,12 @@ public class SubscriptionImpl implements Subscription
if ((msg.getPublisher().getClientProperties() != null) &&
(msgInstance = msg.getPublisher().getClientProperties().getObject(CLIENT_PROPERTIES_INSTANCE)) != null)
{
- if (localInstance == msgInstance || ((localInstance != null) && localInstance.equals(msgInstance)))
+ if (localInstance == msgInstance || localInstance.equals(msgInstance))
{
if (_logger.isTraceEnabled())
{
- _logger.trace("(" + System.identityHashCode(this) + ") has no interest as it is a local message(" +
- System.identityHashCode(msg) + ")");
+ _logger.trace("(" + debugIdentity() + ") has no interest as it is a local message(" +
+ msg.debugIdentity() + ")");
}
return false;
}
@@ -369,8 +386,8 @@ public class SubscriptionImpl implements Subscription
{
if (_logger.isTraceEnabled())
{
- _logger.trace("(" + System.identityHashCode(this) + ") has no interest as it is a local message(" +
- System.identityHashCode(msg) + ")");
+ _logger.trace("(" + debugIdentity() + ") has no interest as it is a local message(" +
+ msg.debugIdentity() + ")");
}
return false;
}
@@ -383,19 +400,26 @@ public class SubscriptionImpl implements Subscription
if (_logger.isTraceEnabled())
{
- _logger.trace("(" + System.identityHashCode(this) + ") checking filters for message (" + System.identityHashCode(msg));
+ _logger.trace("(" + debugIdentity() + ") checking filters for message (" + msg.debugIdentity());
}
return checkFilters(msg);
}
+ private String id = String.valueOf(System.identityHashCode(this));
+
+ private String debugIdentity()
+ {
+ return id;
+ }
+
private boolean checkFilters(AMQMessage msg)
{
if (_filters != null)
{
if (_logger.isTraceEnabled())
{
- _logger.trace("(" + System.identityHashCode(this) + ") has filters.");
+ _logger.trace("(" + debugIdentity() + ") has filters.");
}
return _filters.allAllow(msg);
}
@@ -403,7 +427,7 @@ public class SubscriptionImpl implements Subscription
{
if (_logger.isTraceEnabled())
{
- _logger.trace("(" + System.identityHashCode(this) + ") has no filters");
+ _logger.trace("(" + debugIdentity() + ") has no filters");
}
return true;
@@ -445,15 +469,19 @@ public class SubscriptionImpl implements Subscription
}
_sendLock.set(true);
-
}
+
if (_logger.isInfoEnabled())
{
- _logger.info("Closing subscription (" + System.identityHashCode(this) + "):" + this);
+ _logger.info("Closing subscription (" + debugIdentity() + "):" + this);
}
if (_resendQueue != null && !_resendQueue.isEmpty())
{
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info("Requeuing closing subscription (" + debugIdentity() + "):" + this);
+ }
requeue();
}
@@ -486,6 +514,11 @@ public class SubscriptionImpl implements Subscription
{
AMQMessage resent = _resendQueue.poll();
+ if (_logger.isTraceEnabled())
+ {
+ _logger.trace("Removed for resending:" + resent.debugIdentity());
+ }
+
resent.release();
_queue.subscriberHasPendingResend(false, this, resent);
@@ -495,7 +528,7 @@ public class SubscriptionImpl implements Subscription
}
catch (AMQException e)
{
- _logger.error("Unable to re-deliver messages", e);
+ _logger.error("MESSAGE LOSS : Unable to re-deliver messages", e);
}
}
diff --git a/java/client/pom.xml b/java/client/pom.xml
index 617390c059..abac5b3f1a 100644
--- a/java/client/pom.xml
+++ b/java/client/pom.xml
@@ -55,6 +55,13 @@
<dependency>
<groupId>commons-collections</groupId>
<artifactId>commons-collections</artifactId>
+ <!-- commons collection exports log4j v1.2.7 which doesn't have trace()-->
+ <exclusions>
+ <exclusion>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index 89f596e541..61143eee69 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -198,9 +198,10 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
private final Object _suspensionLock = new Object();
-
/** Responsible for decoding a message fragment and passing it to the appropriate message consumer. */
+ private static final Logger _dispatcherLogger = Logger.getLogger(Dispatcher.class);
+
private class Dispatcher extends Thread
{
@@ -212,12 +213,37 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
public Dispatcher()
{
super("Dispatcher-Channel-" + _channelId);
+ if (_dispatcherLogger.isInfoEnabled())
+ {
+ _dispatcherLogger.info(getName() + " created");
+ }
}
public void run()
{
+ if (_dispatcherLogger.isInfoEnabled())
+ {
+ _dispatcherLogger.info(getName() + " started");
+ }
+
UnprocessedMessage message;
+ // Allow disptacher to start stopped
+ synchronized (_lock)
+ {
+ while (connectionStopped())
+ {
+ try
+ {
+ _lock.wait();
+ }
+ catch (InterruptedException e)
+ {
+ // ignore
+ }
+ }
+ }
+
try
{
while (!_closed.get() && (message = (UnprocessedMessage) _queue.take()) != null)
@@ -243,10 +269,12 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
}
catch (InterruptedException e)
{
- ;
+ //ignore
+ }
+ if (_dispatcherLogger.isInfoEnabled())
+ {
+ _dispatcherLogger.info(getName() + " thread terminating for channel " + _channelId);
}
-
- _logger.info("Dispatcher thread terminating for channel " + _channelId);
}
// only call while holding lock
@@ -263,6 +291,12 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
currently = _connectionStopped;
_connectionStopped = connectionStopped;
_lock.notify();
+
+ if (_dispatcherLogger.isDebugEnabled())
+ {
+ _dispatcherLogger.debug("Dispatcher Connection " + (connectionStopped ? "Started" : "Stopped") +
+ ": Currently " + (currently ? "Started" : "Stopped"));
+ }
}
return currently;
}
@@ -275,9 +309,14 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
if (consumer == null)
{
- _logger.warn("Received a message from queue " + message.getDeliverBody().consumerTag + " without a handler - ignoring...");
- _logger.warn("Consumers that exist: " + _consumers);
- _logger.warn("Session hashcode: " + System.identityHashCode(this));
+ if (_dispatcherLogger.isInfoEnabled())
+ {
+ _dispatcherLogger.info("Received a message(" + System.identityHashCode(message) + ")" +
+ "[" + message.getDeliverBody().deliveryTag + "] from queue "
+ + message.getDeliverBody().consumerTag + " without a handler - rejecting(requeue)...");
+ }
+
+ rejectMessage(message, true);
}
else
{
@@ -311,7 +350,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
rejectAllMessages(true);
- _logger.debug("Session Pre Dispatch Queue cleared");
+ _dispatcherLogger.debug("Session Pre Dispatch Queue cleared");
for (BasicMessageConsumer consumer : _consumers.values())
{
@@ -323,20 +362,28 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
}
- public void rejectPending(AMQShortString consumerTag)
+ public void rejectPending(BasicMessageConsumer consumer)
{
synchronized (_lock)
{
- boolean stopped = connectionStopped();
+ boolean stopped = _dispatcher.connectionStopped();
- _dispatcher.setConnectionStopped(false);
-
- rejectMessagesForConsumerTag(consumerTag, true);
-
- if (stopped)
+ if (!stopped)
{
- _dispatcher.setConnectionStopped(stopped);
+ _dispatcher.setConnectionStopped(true);
}
+
+ // Reject messages on pre-receive queue
+ consumer.rollback();
+
+ // Reject messages on pre-dispatch queue
+ rejectMessagesForConsumerTag(consumer.getConsumerTag(), true);
+
+ // Remove consumer from map.
+ deregisterConsumer(consumer);
+
+ _dispatcher.setConnectionStopped(stopped);
+
}
}
}
@@ -549,14 +596,15 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
suspendChannel(true);
}
- _connection.getProtocolHandler().syncWrite(
- TxRollbackBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion()), TxRollbackOkBody.class);
-
if (_dispatcher != null)
{
_dispatcher.rollback();
}
+ _connection.getProtocolHandler().syncWrite(
+ TxRollbackBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion()), TxRollbackOkBody.class);
+
+
if (!isSuspended)
{
suspendChannel(false);
@@ -663,14 +711,10 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
jmse = e;
}
}
- finally
+ if (jmse != null)
{
- if (jmse != null)
- {
- throw jmse;
- }
+ throw jmse;
}
-
}
@@ -835,6 +879,11 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
consumer.clearUnackedMessages();
}
+ if (_dispatcher != null)
+ {
+ _dispatcher.rollback();
+ }
+
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
@@ -844,11 +893,6 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
false) // requeue
, BasicRecoverOkBody.class);
- if (_dispatcher != null)
- {
- _dispatcher.rollback();
- }
-
if (!isSuspended)
{
suspendChannel(false);
@@ -1223,35 +1267,17 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
return (counter != null) && (counter.get() != 0);
}
-
- public void declareExchange(AMQShortString name, AMQShortString type) throws AMQException
+ public void declareExchange(AMQShortString name, AMQShortString type, boolean nowait) throws AMQException
{
- declareExchange(name, type, getProtocolHandler());
+ declareExchange(name, type, getProtocolHandler(), nowait);
}
- public void declareExchangeSynch(AMQShortString name, AMQShortString type) throws AMQException
+ private void declareExchange(AMQDestination amqd, AMQProtocolHandler protocolHandler, boolean nowait) throws AMQException
{
- // TODO: Be aware of possible changes to parameter order as versions change.
- AMQFrame frame = ExchangeDeclareBody.createAMQFrame(_channelId,
- getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor)
- null, // arguments
- false, // autoDelete
- false, // durable
- name, // exchange
- false, // internal
- false, // nowait
- false, // passive
- getTicket(), // ticket
- type); // type
- getProtocolHandler().syncWrite(frame, ExchangeDeclareOkBody.class);
+ declareExchange(amqd.getExchangeName(), amqd.getExchangeClass(), protocolHandler, nowait);
}
- private void declareExchange(AMQDestination amqd, AMQProtocolHandler protocolHandler) throws AMQException
- {
- declareExchange(amqd.getExchangeName(), amqd.getExchangeClass(), protocolHandler);
- }
-
- private void declareExchange(AMQShortString name, AMQShortString type, AMQProtocolHandler protocolHandler) throws AMQException
+ private void declareExchange(AMQShortString name, AMQShortString type, AMQProtocolHandler protocolHandler, boolean nowait) throws AMQException
{
// TODO: Be aware of possible changes to parameter order as versions change.
AMQFrame exchangeDeclare = ExchangeDeclareBody.createAMQFrame(_channelId,
@@ -1261,7 +1287,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
false, // durable
name, // exchange
false, // internal
- false, // nowait
+ nowait, // nowait
false, // passive
getTicket(), // ticket
type); // type
@@ -1874,15 +1900,21 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
synchronized void startDistpatcherIfNecessary()
{
+ startDistpatcherIfNecessary(false);
+ }
+
+ synchronized void startDistpatcherIfNecessary(boolean initiallyStopped)
+ {
if (_dispatcher == null)
{
_dispatcher = new Dispatcher();
_dispatcher.setDaemon(true);
+ _dispatcher.setConnectionStopped(initiallyStopped);
_dispatcher.start();
}
else
{
- _dispatcher.setConnectionStopped(false);
+ _dispatcher.setConnectionStopped(initiallyStopped);
}
}
@@ -1910,7 +1942,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
AMQProtocolHandler protocolHandler = getProtocolHandler();
- declareExchange(amqd, protocolHandler);
+ declareExchange(amqd, protocolHandler, false);
AMQShortString queueName = declareQueue(amqd, protocolHandler);
@@ -1950,12 +1982,6 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
_destinationConsumerCount.remove(dest);
}
}
-
- //ensure we remove the messages from the consumer even if the dispatcher hasn't started
- if (_dispatcher == null)
- {
- rejectMessagesForConsumerTag(consumer.getConsumerTag(), true);
- }// if the dispatcher is running we have to do the clean up in the Ok Handler.
}
}
@@ -2033,6 +2059,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
public void confirmConsumerCancelled(AMQShortString consumerTag)
{
+
+ // Remove the consumer from the map
BasicMessageConsumer consumer = (BasicMessageConsumer) _consumers.get(consumerTag);
if (consumer != null)
{
@@ -2040,26 +2068,33 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
{
consumer.closeWhenNoMessages(true);
}
+
+ //Clean the Maps up first
+ //Flush any pending messages for this consumerTag
+ if (_dispatcher != null)
+ {
+ _logger.info("Dispatcher is not null");
+ }
else
{
- consumer.rollback();
+ _logger.info("Dispatcher is null so created stopped dispatcher");
+
+ startDistpatcherIfNecessary(true);
}
- }
- //Flush any pending messages for this consumerTag
- if (_dispatcher != null)
- {
- _dispatcher.rejectPending(consumerTag);
+ _dispatcher.rejectPending(consumer);
}
else
{
- rejectMessagesForConsumerTag(consumerTag, true);
+ _logger.warn("Unable to confirm cancellation of consumer (" + consumerTag + "). Not found in consumer map.");
}
+
+
}
/*
- * I could have combined the last 3 methods, but this way it improves readability
- */
+ * I could have combined the last 3 methods, but this way it improves readability
+ */
private AMQTopic checkValidTopic(Topic topic) throws JMSException
{
if (topic == null)
@@ -2189,16 +2224,20 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
if (consumerTag == null || message.getDeliverBody().consumerTag.equals(consumerTag))
{
- if (_logger.isTraceEnabled())
+ if (_logger.isDebugEnabled())
{
- _logger.trace("Removing message from _queue:" + message);
+ _logger.debug("Removing message(" + System.identityHashCode(message) +
+ ") from _queue DT:" + message.getDeliverBody().deliveryTag);
}
messages.remove();
- rejectMessage(message.getDeliverBody().deliveryTag, requeue);
+ rejectMessage(message, requeue);
- _logger.debug("Rejected the message(" + message.getDeliverBody() + ") for consumer :" + consumerTag);
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Rejected the message(" + message.getDeliverBody() + ") for consumer :" + consumerTag);
+ }
}
else
{
@@ -2207,15 +2246,45 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
}
}
+
+ public void rejectMessage(UnprocessedMessage message, boolean requeue)
+ {
+
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Rejecting Unacked message:" + message.getDeliverBody().deliveryTag);
+ }
+
+ rejectMessage(message.getDeliverBody().deliveryTag, requeue);
+ }
+
+ public void rejectMessage(AbstractJMSMessage message, boolean requeue)
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Rejecting Abstract message:" + message.getDeliveryTag());
+ }
+ rejectMessage(message.getDeliveryTag(), requeue);
+
+ }
+
public void rejectMessage(long deliveryTag, boolean requeue)
{
- AMQFrame basicRejectBody = BasicRejectBody.createAMQFrame(_channelId,
- getProtocolMajorVersion(),
- getProtocolMinorVersion(),
- deliveryTag,
- requeue);
+ if (_acknowledgeMode == CLIENT_ACKNOWLEDGE ||
+ _acknowledgeMode == SESSION_TRANSACTED)
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Rejecting delivery tag:" + deliveryTag);
+ }
+ AMQFrame basicRejectBody = BasicRejectBody.createAMQFrame(_channelId,
+ getProtocolMajorVersion(),
+ getProtocolMinorVersion(),
+ deliveryTag,
+ requeue);
- _connection.getProtocolHandler().writeFrame(basicRejectBody);
+ _connection.getProtocolHandler().writeFrame(basicRejectBody);
+ }
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
index e9b914425a..9043faa80c 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
@@ -21,6 +21,7 @@
package org.apache.qpid.client;
import java.util.Iterator;
+import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
@@ -109,9 +110,6 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
/** Number of messages unacknowledged in DUPS_OK_ACKNOWLEDGE mode */
private int _outstanding;
- /** Tag of last message delievered, whoch should be acknowledged on commit in transaction mode. */
- private long _lastDeliveryTag;
-
/**
* Switch to enable sending of acknowledgements when using DUPS_OK_ACKNOWLEDGE mode. Enabled when _outstannding
* number of msgs >= _prefetchHigh and disabled at < _prefetchLow
@@ -120,6 +118,9 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
private ConcurrentLinkedQueue<Long> _unacknowledgedDeliveryTags = new ConcurrentLinkedQueue<Long>();
+ /** List of tags delievered, The last of which which should be acknowledged on commit in transaction mode. */
+ private ConcurrentLinkedQueue<Long> _receivedDeliveryTags = new ConcurrentLinkedQueue<Long>();
+
/**
* The thread that was used to call receive(). This is important for being able to interrupt that thread if a
* receive() is in progress.
@@ -432,6 +433,11 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
public void close(boolean sendClose) throws JMSException
{
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info("Closing consumer:" + debugIdentity());
+ }
+
synchronized (_connection.getFailoverMutex())
{
if (!_closed.getAndSet(true))
@@ -448,6 +454,12 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
try
{
_protocolHandler.syncWrite(cancelFrame, BasicCancelOkBody.class);
+
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("CancelOk'd for consumer:" + debugIdentity());
+ }
+
}
catch (AMQException e)
{
@@ -456,11 +468,14 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
}
}
- deregisterConsumer();
- _unacknowledgedDeliveryTags.clear();
+ //done in BasicCancelOK Handler
+ //deregisterConsumer();
if (_messageListener != null && _receiving.get())
{
- _logger.info("Interrupting thread: " + _receivingThread);
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info("Interrupting thread: " + _receivingThread);
+ }
_receivingThread.interrupt();
}
}
@@ -616,7 +631,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
}
else
{
- _lastDeliveryTag = msg.getDeliveryTag();
+ _receivedDeliveryTags.add(msg.getDeliveryTag());
}
break;
}
@@ -625,10 +640,16 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
/** Acknowledge up to last message delivered (if any). Used when commiting. */
void acknowledgeLastDelivered()
{
- if (_lastDeliveryTag > 0)
+ if (!_receivedDeliveryTags.isEmpty())
{
- _session.acknowledgeMessage(_lastDeliveryTag, true);
- _lastDeliveryTag = -1;
+ long lastDeliveryTag = _receivedDeliveryTags.poll();
+
+ while (!_receivedDeliveryTags.isEmpty())
+ {
+ lastDeliveryTag = _receivedDeliveryTags.poll();
+ }
+
+ _session.acknowledgeMessage(lastDeliveryTag, true);
}
}
@@ -738,43 +759,76 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
public void rollback()
{
+ clearUnackedMessages();
+
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Rejecting received messages");
+ }
+ //rollback received but not committed messages
+ while (!_receivedDeliveryTags.isEmpty())
+ {
+ Long tag = _receivedDeliveryTags.poll();
+
+ if (tag != null)
+ {
+ if (_logger.isTraceEnabled())
+ {
+ _logger.trace("Rejecting tag from _receivedDTs:" + tag);
+ }
+
+ _session.rejectMessage(tag, true);
+ }
+ }
+
+ //rollback pending messages
if (_synchronousQueue.size() > 0)
{
if (_logger.isDebugEnabled())
{
- _logger.debug("Rejecting the messages for consumer with tag:" + _consumerTag);
+ _logger.debug("Rejecting the messages(" + _synchronousQueue.size() + ")" +
+ "for consumer with tag:" + _consumerTag);
}
Iterator iterator = _synchronousQueue.iterator();
+
while (iterator.hasNext())
{
- Object o = iterator.next();
+ Object o = iterator.next();
if (o instanceof AbstractJMSMessage)
{
- _session.rejectMessage(((AbstractJMSMessage) o).getDeliveryTag(), true);
+ _session.rejectMessage(((AbstractJMSMessage) o), true);
if (_logger.isTraceEnabled())
{
- _logger.trace("Rejected message" + o);
- iterator.remove();
+ _logger.trace("Rejected message:" + ((AbstractJMSMessage) o).getDeliveryTag());
}
+ iterator.remove();
}
else
{
_logger.error("Queue contained a :" + o.getClass() +
" unable to reject as it is not an AbstractJMSMessage. Will be cleared");
+ iterator.remove();
}
}
if (_synchronousQueue.size() != 0)
{
_logger.warn("Queue was not empty after rejecting all messages Remaining:" + _synchronousQueue.size());
+ rollback();
}
_synchronousQueue.clear();
}
}
+
+ public String debugIdentity()
+ {
+ return String.valueOf(_consumerTag);
+ }
+
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/BasicCancelOkMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/BasicCancelOkMethodHandler.java
index 9bd0205977..bd8177feb6 100644
--- a/java/client/src/main/java/org/apache/qpid/client/handler/BasicCancelOkMethodHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/BasicCancelOkMethodHandler.java
@@ -28,27 +28,29 @@ import org.apache.qpid.client.state.StateAwareMethodListener;
import org.apache.qpid.framing.BasicCancelOkBody;
import org.apache.qpid.protocol.AMQMethodEvent;
-/**
- * @author Apache Software Foundation
- */
public class BasicCancelOkMethodHandler implements StateAwareMethodListener
{
- private static final Logger _logger = Logger.getLogger(BasicCancelOkMethodHandler.class);
- private static final BasicCancelOkMethodHandler _instance = new BasicCancelOkMethodHandler();
+ private static final Logger _logger = Logger.getLogger(BasicCancelOkMethodHandler.class);
+ private static final BasicCancelOkMethodHandler _instance = new BasicCancelOkMethodHandler();
+
+ public static BasicCancelOkMethodHandler getInstance()
+ {
+ return _instance;
+ }
+
+ private BasicCancelOkMethodHandler()
+ {
+ }
- public static BasicCancelOkMethodHandler getInstance()
- {
- return _instance;
- }
+ public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt) throws AMQException
+ {
+ BasicCancelOkBody body = (BasicCancelOkBody) evt.getMethod();
- private BasicCancelOkMethodHandler()
- {
- }
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info("New BasicCancelOk method received for consumer:" + body.consumerTag);
+ }
- public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt) throws AMQException
- {
- _logger.debug("New BasicCancelOk method received");
- BasicCancelOkBody body = (BasicCancelOkBody) evt.getMethod();
- protocolSession.confirmConsumerCancelled(evt.getChannelId(), body.consumerTag);
- }
+ protocolSession.confirmConsumerCancelled(evt.getChannelId(), body.consumerTag);
+ }
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java
index ddf79ec907..b176df87fe 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java
@@ -30,13 +30,11 @@ import org.apache.qpid.framing.ContentBody;
import org.apache.qpid.framing.ContentHeaderBody;
/**
- * This class contains everything needed to process a JMS message. It assembles the
- * deliver body, the content header and the content body/ies.
- *
- * Note that the actual work of creating a JMS message for the client code's use is done
- * outside of the MINA dispatcher thread in order to minimise the amount of work done in
- * the MINA dispatcher thread.
+ * This class contains everything needed to process a JMS message. It assembles the deliver body, the content header and
+ * the content body/ies.
*
+ * Note that the actual work of creating a JMS message for the client code's use is done outside of the MINA dispatcher
+ * thread in order to minimise the amount of work done in the MINA dispatcher thread.
*/
public class UnprocessedMessage
{
@@ -47,9 +45,7 @@ public class UnprocessedMessage
private final int _channelId;
private ContentHeaderBody _contentHeader;
- /**
- * List of ContentBody instances. Due to fragmentation you don't know how big this will be in general
- */
+ /** List of ContentBody instances. Due to fragmentation you don't know how big this will be in general */
private List<ContentBody> _bodies;
public UnprocessedMessage(int channelId, BasicDeliverBody deliverBody)
@@ -74,9 +70,9 @@ public class UnprocessedMessage
{
final long payloadSize = body.payload.remaining();
- if(_bodies == null)
+ if (_bodies == null)
{
- if(payloadSize == getContentHeader().bodySize)
+ if (payloadSize == getContentHeader().bodySize)
{
_bodies = Collections.singletonList(body);
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/state/StateWaiter.java b/java/client/src/main/java/org/apache/qpid/client/state/StateWaiter.java
index b2940d73ae..8a0b5e7d84 100644
--- a/java/client/src/main/java/org/apache/qpid/client/state/StateWaiter.java
+++ b/java/client/src/main/java/org/apache/qpid/client/state/StateWaiter.java
@@ -58,6 +58,7 @@ public class StateWaiter implements StateListener
{
_logger.debug("State " + _state + " not achieved so waiting...");
_monitor.wait(TIME_OUT);
+ //fixme this won't cause the timeout to exit the loop. need to set _throwable
}
catch (InterruptedException e)
{
diff --git a/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java b/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java
index 03e7d399ce..cb4ef01d25 100644
--- a/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java
+++ b/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java
@@ -20,9 +20,6 @@
*/
package org.apache.qpid.client.util;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.client.message.UnprocessedMessage;
-import org.apache.log4j.Logger;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java
index 338404a431..4667a2b3fa 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java
@@ -73,7 +73,8 @@ public class RecoverTest extends TestCase
Queue queue = new AMQQueue(consumerSession.getDefaultQueueExchangeName(),new AMQShortString("someQ"), new AMQShortString("someQ"), false, true);
MessageConsumer consumer = consumerSession.createConsumer(queue);
//force synch to ensure the consumer has resulted in a bound queue
- ((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.DIRECT_EXCHANGE_NAME, ExchangeDefaults.DIRECT_EXCHANGE_CLASS);
+ //((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.DIRECT_EXCHANGE_NAME, ExchangeDefaults.DIRECT_EXCHANGE_CLASS);
+ // This is the default now
AMQConnection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "test");
Session producerSession = con2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
@@ -130,7 +131,8 @@ public class RecoverTest extends TestCase
Queue queue = new AMQQueue(consumerSession.getDefaultQueueExchangeName(), new AMQShortString("someQ"), new AMQShortString("someQ"), false, true);
MessageConsumer consumer = consumerSession.createConsumer(queue);
//force synch to ensure the consumer has resulted in a bound queue
- ((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.DIRECT_EXCHANGE_NAME, ExchangeDefaults.DIRECT_EXCHANGE_CLASS);
+ //((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.DIRECT_EXCHANGE_NAME, ExchangeDefaults.DIRECT_EXCHANGE_CLASS);
+ // This is the default now
AMQConnection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "test");
Session producerSession = con2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseTest.java
index 3431c56783..51bbe7d0e6 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseTest.java
@@ -109,6 +109,10 @@ public class ChannelCloseTest extends TestCase implements ExceptionListener, Con
}
catch (AMQException e)
{
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info("Exception occured was:" + e.getErrorCode());
+ }
assertEquals("Connection should be closed", AMQConstant.CHANNEL_ERROR, e.getErrorCode());
_connection = newConnection();
@@ -315,15 +319,15 @@ public class ChannelCloseTest extends TestCase implements ExceptionListener, Con
}
catch (JMSException e)
{
- fail("Creating new connection when:"+e.getMessage());
+ fail("Creating new connection when:" + e.getMessage());
}
catch (AMQException e)
{
- fail("Creating new connection when:"+e.getMessage());
+ fail("Creating new connection when:" + e.getMessage());
}
catch (URLSyntaxException e)
{
- fail("Creating new connection when:"+e.getMessage());
+ fail("Creating new connection when:" + e.getMessage());
}
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java
new file mode 100644
index 0000000000..a56bae3d70
--- /dev/null
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java
@@ -0,0 +1,603 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.test.unit.close;
+
+import junit.framework.TestCase;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+
+import javax.jms.ExceptionListener;
+import javax.jms.Session;
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Queue;
+import javax.jms.MessageProducer;
+import javax.jms.Message;
+import javax.jms.TextMessage;
+import javax.jms.MessageConsumer;
+
+import org.apache.qpid.client.AMQConnectionFactory;
+import org.apache.qpid.client.AMQConnectionURL;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.message.AbstractJMSMessage;
+import org.apache.qpid.client.transport.TransportConnection;
+import org.apache.qpid.url.URLSyntaxException;
+import org.apache.qpid.AMQException;
+import org.apache.log4j.Logger;
+import org.apache.log4j.Level;
+
+public class MessageRequeueTest extends TestCase
+{
+
+ private static final Logger _logger = Logger.getLogger(MessageRequeueTest.class);
+
+ protected static AtomicInteger consumerIds = new AtomicInteger(0);
+ protected final Integer numTestMessages = 150;
+
+ protected final int consumeTimeout = 3000;
+
+ protected final String queue = "direct://amq.direct//queue";
+ protected String payload = "Message:";
+
+ protected final String BROKER = "vm://:1";
+ private boolean testReception = true;
+
+ private long[] receieved = new long[numTestMessages + 1];
+ private boolean passed=false;
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ TransportConnection.createVMBroker(1);
+
+ QpidClientConnection conn = new QpidClientConnection();
+
+ conn.connect();
+ // clear queue
+ conn.consume(queue, consumeTimeout);
+ // load test data
+ _logger.info("creating test data, " + numTestMessages + " messages");
+ conn.put(queue, payload, numTestMessages);
+ // close this connection
+ conn.disconnect();
+ }
+
+ protected void tearDown() throws Exception
+ {
+ super.tearDown();
+
+ if (!passed)
+ {
+ QpidClientConnection conn = new QpidClientConnection();
+
+ conn.connect();
+ // clear queue
+ conn.consume(queue, consumeTimeout);
+ }
+ TransportConnection.killVMBroker(1);
+ }
+
+ /** multiple consumers */
+ public void testDrain() throws JMSException, InterruptedException
+ {
+ QpidClientConnection conn = new QpidClientConnection();
+
+ conn.connect();
+
+ _logger.info("consuming queue " + queue);
+ Queue q = conn.getSession().createQueue(queue);
+
+ final MessageConsumer consumer = conn.getSession().createConsumer(q);
+ int messagesReceived = 0;
+
+ long messageLog[] = new long[numTestMessages + 1];
+
+ _logger.info("consuming...");
+ Message msg = consumer.receive(1000);
+ while (msg != null)
+ {
+ messagesReceived++;
+
+ long dt = ((AbstractJMSMessage) msg).getDeliveryTag();
+
+ int msgindex = msg.getIntProperty("index");
+ if (messageLog[msgindex] != 0)
+ {
+ _logger.error("Received Message(" + msgindex + ":" + ((AbstractJMSMessage) msg).getDeliveryTag() +
+ ") more than once.");
+ }
+
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info("Received Message(" + System.identityHashCode(msgindex) + ") " +
+ "DT:" + dt +
+ "IN:" + msgindex);
+ }
+
+ if (dt == 0)
+ {
+ _logger.error("DT is zero for msg:" + msgindex);
+ }
+
+ messageLog[msgindex] = dt;
+
+ //get Next message
+ msg = consumer.receive(1000);
+ }
+
+ conn.getSession().commit();
+ consumer.close();
+ assertEquals("number of consumed messages does not match initial data", (int) numTestMessages, messagesReceived);
+
+ int index = 0;
+ StringBuilder list = new StringBuilder();
+ list.append("Failed to receive:");
+ int failed = 0;
+
+ for (long b : messageLog)
+ {
+ if (b == 0 && index != 0) //delivery tag of zero shouldn't exist
+ {
+ _logger.error("Index: " + index + " was not received.");
+ list.append(" ");
+ list.append(index);
+ list.append(":");
+ list.append(b);
+ failed++;
+ }
+
+ index++;
+ }
+ assertEquals(list.toString(), 0, failed);
+ _logger.info("consumed: " + messagesReceived);
+ conn.disconnect();
+ }
+
+ /** multiple consumers */
+ public void testTwoCompetingConsumers()
+ {
+ Consumer c1 = new Consumer();
+ Consumer c2 = new Consumer();
+ Consumer c3 = new Consumer();
+ Consumer c4 = new Consumer();
+
+ Thread t1 = new Thread(c1);
+ Thread t2 = new Thread(c2);
+ Thread t3 = new Thread(c3);
+ Thread t4 = new Thread(c4);
+
+ t1.start();
+// t2.start();
+// t3.start();
+// t4.start();
+
+ try
+ {
+ t1.join();
+ t2.join();
+ t3.join();
+ t4.join();
+ }
+ catch (InterruptedException e)
+ {
+ fail("Uanble to join to Consumer theads");
+ }
+
+ _logger.info("consumer 1 count is " + c1.getCount());
+ _logger.info("consumer 2 count is " + c2.getCount());
+ _logger.info("consumer 3 count is " + c3.getCount());
+ _logger.info("consumer 4 count is " + c4.getCount());
+
+ Integer totalConsumed = c1.getCount() + c2.getCount() + c3.getCount() + c4.getCount();
+
+ // Check all messages were correctly delivered
+ int index = 0;
+ StringBuilder list = new StringBuilder();
+ list.append("Failed to receive:");
+ int failed = 0;
+
+ for (long b : receieved)
+ {
+ if (b == 0 && index != 0) //delivery tag of zero shouldn't exist (and we don't have msg 0)
+ {
+ _logger.error("Index: " + index + " was not received.");
+ list.append(" ");
+ list.append(index);
+ list.append(":");
+ list.append(b);
+ failed++;
+ }
+ index++;
+ }
+ assertEquals(list.toString() + "-" + numTestMessages + "-" + totalConsumed, 0, failed);
+ assertEquals("number of consumed messages does not match initial data", numTestMessages, totalConsumed);
+ passed=true;
+ }
+
+ class Consumer implements Runnable
+ {
+ private Integer count = 0;
+ private Integer id;
+
+ public Consumer()
+ {
+ id = consumerIds.addAndGet(1);
+ }
+
+ public void run()
+ {
+ try
+ {
+ _logger.info("consumer-" + id + ": starting");
+ QpidClientConnection conn = new QpidClientConnection();
+
+ conn.connect();
+
+ _logger.info("consumer-" + id + ": connected, consuming...");
+ Message result;
+ do
+ {
+ result = conn.getNextMessage(queue, consumeTimeout);
+ if (result != null)
+ {
+
+ long dt = ((AbstractJMSMessage) result).getDeliveryTag();
+
+ if (testReception)
+ {
+ int msgindex = result.getIntProperty("index");
+ if (receieved[msgindex] != 0)
+ {
+ _logger.error("Received Message(" + msgindex + ":" + ((AbstractJMSMessage) result).getDeliveryTag() +
+ ") more than once.");
+ }
+
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info("Received Message(" + System.identityHashCode(msgindex) + ") " +
+ "DT:" + dt +
+ "IN:" + msgindex);
+ }
+
+ if (dt == 0)
+ {
+ _logger.error("DT is zero for msg:" + msgindex);
+ }
+
+ receieved[msgindex] = dt;
+ }
+
+
+ count++;
+ if (count % 100 == 0)
+ {
+ _logger.info("consumer-" + id + ": got " + result + ", new count is " + count);
+ }
+ }
+ }
+ while (result != null);
+
+ _logger.info("consumer-" + id + ": complete");
+ conn.disconnect();
+
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ }
+ }
+
+ public Integer getCount()
+ {
+ return count;
+ }
+
+ public Integer getId()
+ {
+ return id;
+ }
+ }
+
+
+ public class QpidClientConnection implements ExceptionListener
+ {
+ private boolean transacted = true;
+ private int ackMode = Session.CLIENT_ACKNOWLEDGE;
+ private Connection connection;
+
+ private String virtualHost;
+ private String brokerlist;
+ private int prefetch;
+ protected Session session;
+ protected boolean connected;
+
+ public QpidClientConnection()
+ {
+ super();
+ setVirtualHost("/test");
+ setBrokerList(BROKER);
+ setPrefetch(5000);
+ }
+
+
+ public void connect() throws JMSException
+ {
+ if (!connected)
+ {
+ /*
+ * amqp://[user:pass@][clientid]/virtualhost?
+ * brokerlist='[transport://]host[:port][?option='value'[&option='value']];'
+ * [&failover='method[?option='value'[&option='value']]']
+ * [&option='value']"
+ */
+ String brokerUrl = "amqp://guest:guest@" + virtualHost + "?brokerlist='" + brokerlist + "'";
+ try
+ {
+ AMQConnectionFactory factory = new AMQConnectionFactory(new AMQConnectionURL(brokerUrl));
+ _logger.info("connecting to Qpid :" + brokerUrl);
+ connection = factory.createConnection();
+
+ // register exception listener
+ connection.setExceptionListener(this);
+
+ session = ((AMQConnection) connection).createSession(transacted, ackMode, prefetch);
+
+
+ _logger.info("starting connection");
+ connection.start();
+
+ connected = true;
+ }
+ catch (URLSyntaxException e)
+ {
+ throw new JMSException("URL syntax error in [" + brokerUrl + "]: " + e.getMessage());
+ }
+ }
+ }
+
+ public void disconnect() throws JMSException
+ {
+ if (connected)
+ {
+ session.commit();
+ session.close();
+ connection.close();
+ connected = false;
+ _logger.info("disconnected");
+ }
+ }
+
+ public void disconnectWithoutCommit() throws JMSException
+ {
+ if (connected)
+ {
+ session.close();
+ connection.close();
+ connected = false;
+ _logger.info("disconnected without commit");
+ }
+ }
+
+ public String getBrokerList()
+ {
+ return brokerlist;
+ }
+
+ public void setBrokerList(String brokerlist)
+ {
+ this.brokerlist = brokerlist;
+ }
+
+ public String getVirtualHost()
+ {
+ return virtualHost;
+ }
+
+ public void setVirtualHost(String virtualHost)
+ {
+ this.virtualHost = virtualHost;
+ }
+
+ public void setPrefetch(int prefetch)
+ {
+ this.prefetch = prefetch;
+ }
+
+
+ /** override as necessary */
+ public void onException(JMSException exception)
+ {
+ _logger.info("ExceptionListener event: error " + exception.getErrorCode() + ", message: " + exception.getMessage());
+ }
+
+ public boolean isConnected()
+ {
+ return connected;
+ }
+
+ public Session getSession()
+ {
+ return session;
+ }
+
+ /**
+ * Put a String as a text messages, repeat n times. A null payload will result in a null message.
+ *
+ * @param queueName The queue name to put to
+ * @param payload the content of the payload
+ * @param copies the number of messages to put
+ *
+ * @throws javax.jms.JMSException any exception that occurs
+ */
+ public void put(String queueName, String payload, int copies) throws JMSException
+ {
+ if (!connected)
+ {
+ connect();
+ }
+
+ _logger.info("putting to queue " + queueName);
+ Queue queue = session.createQueue(queueName);
+
+ final MessageProducer sender = session.createProducer(queue);
+
+ for (int i = 0; i < copies; i++)
+ {
+ Message m = session.createTextMessage(payload + i);
+ m.setIntProperty("index", i + 1);
+ sender.send(m);
+ }
+
+ session.commit();
+ sender.close();
+ _logger.info("put " + copies + " copies");
+ }
+
+ /**
+ * GET the top message on a queue. Consumes the message. Accepts timeout value.
+ *
+ * @param queueName The quename to get from
+ * @param readTimeout The timeout to use
+ *
+ * @return the content of the text message if any
+ *
+ * @throws javax.jms.JMSException any exception that occured
+ */
+ public Message getNextMessage(String queueName, long readTimeout) throws JMSException
+ {
+ if (!connected)
+ {
+ connect();
+ }
+
+ Queue queue = session.createQueue(queueName);
+
+ final MessageConsumer consumer = session.createConsumer(queue);
+
+ Message message = consumer.receive(readTimeout);
+ session.commit();
+ consumer.close();
+
+ Message result;
+
+ // all messages we consume should be TextMessages
+ if (message instanceof TextMessage)
+ {
+ result = ((TextMessage) message);
+ }
+ else if (null == message)
+ {
+ result = null;
+ }
+ else
+ {
+ _logger.info("warning: received non-text message");
+ result = message;
+ }
+
+ return result;
+ }
+
+ /**
+ * GET the top message on a queue. Consumes the message.
+ *
+ * @param queueName The Queuename to get from
+ *
+ * @return The string content of the text message, if any received
+ *
+ * @throws javax.jms.JMSException any exception that occurs
+ */
+ public Message getNextMessage(String queueName) throws JMSException
+ {
+ return getNextMessage(queueName, 0);
+ }
+
+ /**
+ * Completely clears a queue. For readTimeout behaviour see Javadocs for javax.jms.MessageConsumer.
+ *
+ * @param queueName The Queue name to consume from
+ * @param readTimeout The timeout for each consume
+ *
+ * @throws javax.jms.JMSException Any exception that occurs during the consume
+ * @throws InterruptedException If the consume thread was interrupted during a consume.
+ */
+ public void consume(String queueName, int readTimeout) throws JMSException, InterruptedException
+ {
+ if (!connected)
+ {
+ connect();
+ }
+
+ _logger.info("consuming queue " + queueName);
+ Queue queue = session.createQueue(queueName);
+
+ final MessageConsumer consumer = session.createConsumer(queue);
+ int messagesReceived = 0;
+
+ _logger.info("consuming...");
+ while ((consumer.receive(readTimeout)) != null)
+ {
+ messagesReceived++;
+ }
+
+ session.commit();
+ consumer.close();
+ _logger.info("consumed: " + messagesReceived);
+ }
+ }
+
+
+ public void testRequeue() throws JMSException, AMQException, URLSyntaxException
+ {
+ String virtualHost = "/test";
+ String brokerlist = "vm://:1";
+ String brokerUrl = "amqp://guest:guest@" + virtualHost + "?brokerlist='" + brokerlist + "'";
+
+ Connection conn = new AMQConnection(brokerUrl);
+ Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue q = session.createQueue(queue);
+
+ _logger.info("Create Consumer");
+ MessageConsumer consumer = session.createConsumer(q);
+
+ try
+ {
+ Thread.sleep(2000);
+ }
+ catch (InterruptedException e)
+ {
+ //
+ }
+
+ _logger.info("Receiving msg");
+ Message msg = consumer.receive();
+
+ assertNotNull("Message should not be null", msg);
+
+ _logger.info("Close Consumer");
+ consumer.close();
+
+ _logger.info("Close Connection");
+ conn.close();
+ }
+
+} \ No newline at end of file
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/message/StreamMessageTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/message/StreamMessageTest.java
index 07ef5f04d4..fb5ea58174 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/message/StreamMessageTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/message/StreamMessageTest.java
@@ -80,7 +80,8 @@ public class StreamMessageTest extends TestCase
//force synch to ensure the consumer has resulted in a bound queue
- ((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.HEADERS_EXCHANGE_NAME, ExchangeDefaults.HEADERS_EXCHANGE_CLASS);
+ //((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.HEADERS_EXCHANGE_NAME, ExchangeDefaults.HEADERS_EXCHANGE_CLASS);
+ // This is the default now
Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "test");
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java
index 0d75a6b968..2abc139ced 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java
@@ -43,7 +43,8 @@ import javax.jms.TextMessage;
public class CommitRollbackTest extends TestCase
{
protected AMQConnection conn;
- protected final String queue = "direct://amq.direct//Qpid.Client.Transacted.CommitRollback.queue";
+ protected String queue = "direct://amq.direct//Qpid.Client.Transacted.CommitRollback.queue";
+ protected static int testMethod = 0;
protected String payload = "xyzzy";
private Session _session;
private MessageProducer _publisher;
@@ -57,6 +58,11 @@ public class CommitRollbackTest extends TestCase
{
super.setUp();
TransportConnection.createVMBroker(1);
+
+ testMethod++;
+ queue += testMethod;
+
+
newConnection();
}
@@ -84,7 +90,11 @@ public class CommitRollbackTest extends TestCase
TransportConnection.killVMBroker(1);
}
- /** PUT a text message, disconnect before commit, confirm it is gone. */
+ /**
+ * PUT a text message, disconnect before commit, confirm it is gone.
+ *
+ * @throws Exception On error
+ */
public void testPutThenDisconnect() throws Exception
{
assertTrue("session is not transacted", _session.getTransacted());
@@ -109,7 +119,11 @@ public class CommitRollbackTest extends TestCase
assertNull("test message was put and disconnected before commit, but is still present", result);
}
- /** PUT a text message, disconnect before commit, confirm it is gone. */
+ /**
+ * PUT a text message, disconnect before commit, confirm it is gone.
+ *
+ * @throws Exception On error
+ */
public void testPutThenCloseDisconnect() throws Exception
{
assertTrue("session is not transacted", _session.getTransacted());
@@ -140,6 +154,8 @@ public class CommitRollbackTest extends TestCase
/**
* PUT a text message, rollback, confirm message is gone. The consumer is on the same connection but different
* session as producer
+ *
+ * @throws Exception On error
*/
public void testPutThenRollback() throws Exception
{
@@ -160,7 +176,11 @@ public class CommitRollbackTest extends TestCase
assertNull("test message was put and rolled back, but is still present", result);
}
- /** GET a text message, disconnect before commit, confirm it is still there. The consumer is on a new connection */
+ /**
+ * GET a text message, disconnect before commit, confirm it is still there. The consumer is on a new connection
+ *
+ * @throws Exception On error
+ */
public void testGetThenDisconnect() throws Exception
{
assertTrue("session is not transacted", _session.getTransacted());
@@ -194,6 +214,8 @@ public class CommitRollbackTest extends TestCase
/**
* GET a text message, close consumer, disconnect before commit, confirm it is still there. The consumer is on the
* same connection but different session as producer
+ *
+ * @throws Exception On error
*/
public void testGetThenCloseDisconnect() throws Exception
{
@@ -230,6 +252,8 @@ public class CommitRollbackTest extends TestCase
/**
* GET a text message, rollback, confirm it is still there. The consumer is on the same connection but differnt
* session to the producer
+ *
+ * @throws Exception On error
*/
public void testGetThenRollback() throws Exception
{
@@ -266,6 +290,8 @@ public class CommitRollbackTest extends TestCase
/**
* GET a text message, close message producer, rollback, confirm it is still there. The consumer is on the same
* connection but different session as producer
+ *
+ * @throws Exception On error
*/
public void testGetThenCloseRollback() throws Exception
{
@@ -304,7 +330,11 @@ public class CommitRollbackTest extends TestCase
}
- /** Test that rolling back a session purges the dispatcher queue, and the messages arrive in the correct order */
+ /**
+ * Test that rolling back a session purges the dispatcher queue, and the messages arrive in the correct order
+ *
+ * @throws Exception On error
+ */
public void testSend2ThenRollback() throws Exception
{
assertTrue("session is not transacted", _session.getTransacted());
@@ -339,37 +369,41 @@ public class CommitRollbackTest extends TestCase
public void testSend2ThenCloseAfter1andTryAgain() throws Exception
{
-// assertTrue("session is not transacted", _session.getTransacted());
-// assertTrue("session is not transacted", _pubSession.getTransacted());
-//
-// _logger.info("sending two test messages");
-// _publisher.send(_pubSession.createTextMessage("1"));
-// _publisher.send(_pubSession.createTextMessage("2"));
-// _pubSession.commit();
-//
-// _logger.info("getting test message");
-// assertEquals("1", ((TextMessage) _consumer.receive(1000)).getText());
-//
-// _consumer.close();
-//
-// _consumer = _session.createConsumer(_jmsQueue);
-//
-// _logger.info("receiving result");
-// Message result = _consumer.receive(1000);
-// _logger.error("1:" + result);
-//// assertNotNull("test message was consumed and rolled back, but is gone", result);
-//// assertEquals("1" , ((TextMessage) result).getText());
-//// assertTrue("Messasge is not marked as redelivered" + result, result.getJMSRedelivered());
-//
-// result = _consumer.receive(1000);
-// _logger.error("2" + result);
-//// assertNotNull("test message was consumed and rolled back, but is gone", result);
-//// assertEquals("2", ((TextMessage) result).getText());
-//// assertTrue("Messasge is marked as redelivered" + result, !result.getJMSRedelivered());
-//
-// result = _consumer.receive(1000);
-// _logger.error("3" + result);
-// assertNull("test message should be null:" + result, result);
+ assertTrue("session is not transacted", _session.getTransacted());
+ assertTrue("session is not transacted", _pubSession.getTransacted());
+
+ _logger.info("sending two test messages");
+ _publisher.send(_pubSession.createTextMessage("1"));
+ _publisher.send(_pubSession.createTextMessage("2"));
+ _pubSession.commit();
+
+ _logger.info("getting test message");
+ Message result = _consumer.receive(1000);
+
+ assertNotNull("Message received should not be null", result);
+ assertEquals("1", ((TextMessage) result).getText());
+ assertTrue("Messasge is marked as redelivered" + result, !result.getJMSRedelivered());
+
+
+ _logger.info("Closing Consumer");
+ _consumer.close();
+
+ _logger.info("Creating New consumer");
+ _consumer = _session.createConsumer(_jmsQueue);
+
+ _logger.info("receiving result");
+ result = _consumer.receive(1000);
+ assertNotNull("test message was consumed and rolled back, but is gone", result);
+ assertEquals("1", ((TextMessage) result).getText());
+ assertTrue("Messasge is not marked as redelivered" + result, result.getJMSRedelivered());
+
+ result = _consumer.receive(1000);
+ assertNotNull("test message was consumed and rolled back, but is gone", result);
+ assertEquals("2", ((TextMessage) result).getText());
+ assertTrue("Messasge is not marked as redelivered" + result, result.getJMSRedelivered());
+
+ result = _consumer.receive(1000);
+ assertNull("test message should be null:" + result, result);
}
}
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java
index 94cbb426e5..d994d4c141 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java
@@ -62,69 +62,125 @@ public class TransactedTest extends TestCase
{
super.setUp();
TransportConnection.createVMBroker(1);
+ _logger.info("Create Connection");
con = new AMQConnection("vm://:1", "guest", "guest", "TransactedTest", "test");
+
+ _logger.info("Create Session");
session = con.createSession(true, Session.SESSION_TRANSACTED);
+ _logger.info("Create Q1");
queue1 = new AMQQueue(session.getDefaultQueueExchangeName(), new AMQShortString("Q1"), new AMQShortString("Q1"), false, true);
+ _logger.info("Create Q2");
queue2 = new AMQQueue(session.getDefaultQueueExchangeName(), new AMQShortString("Q2"), false);
-
+ _logger.info("Create Consumer of Q1");
consumer1 = session.createConsumer(queue1);
- //Dummy just to create the queue.
+ //Dummy just to create the queue.
+ _logger.info("Create Consumer of Q2");
MessageConsumer consumer2 = session.createConsumer(queue2);
+ _logger.info("Close Consumer of Q2");
consumer2.close();
+
+ _logger.info("Create producer to Q2");
producer2 = session.createProducer(queue2);
+
+ _logger.info("Start Connection");
con.start();
+ _logger.info("Create prep connection");
prepCon = new AMQConnection("vm://:1", "guest", "guest", "PrepConnection", "test");
+
+ _logger.info("Create prep session");
prepSession = prepCon.createSession(false, AMQSession.NO_ACKNOWLEDGE);
+
+ _logger.info("Create prep producer to Q1");
prepProducer1 = prepSession.createProducer(queue1);
+
+ _logger.info("Create prep connection start");
prepCon.start();
- //add some messages
- prepProducer1.send(prepSession.createTextMessage("A"));
- prepProducer1.send(prepSession.createTextMessage("B"));
- prepProducer1.send(prepSession.createTextMessage("C"));
+ _logger.info("Create test connection");
testCon = new AMQConnection("vm://:1", "guest", "guest", "TestConnection", "test");
+ _logger.info("Create test session");
testSession = testCon.createSession(false, AMQSession.NO_ACKNOWLEDGE);
+ _logger.info("Create test consumer of q2");
testConsumer2 = testSession.createConsumer(queue2);
-
}
protected void tearDown() throws Exception
{
+ _logger.info("Close connection");
con.close();
+ _logger.info("Close test connection");
testCon.close();
+ _logger.info("Close prep connection");
prepCon.close();
+ _logger.info("Kill broker");
TransportConnection.killAllVMBrokers();
super.tearDown();
}
public void testCommit() throws Exception
{
+ //add some messages
+ _logger.info("Send prep A");
+ prepProducer1.send(prepSession.createTextMessage("A"));
+ _logger.info("Send prep B");
+ prepProducer1.send(prepSession.createTextMessage("B"));
+ _logger.info("Send prep C");
+ prepProducer1.send(prepSession.createTextMessage("C"));
+
//send and receive some messages
+ _logger.info("Send X to Q2");
producer2.send(session.createTextMessage("X"));
+ _logger.info("Send Y to Q2");
producer2.send(session.createTextMessage("Y"));
+ _logger.info("Send Z to Q2");
producer2.send(session.createTextMessage("Z"));
+
+
+ _logger.info("Read A from Q1");
expect("A", consumer1.receive(1000));
+ _logger.info("Read B from Q1");
expect("B", consumer1.receive(1000));
+ _logger.info("Read C from Q1");
expect("C", consumer1.receive(1000));
//commit
+ _logger.info("session commit");
session.commit();
+ _logger.info("Start test Connection");
testCon.start();
+
//ensure sent messages can be received and received messages are gone
+ _logger.info("Read X from Q2");
expect("X", testConsumer2.receive(1000));
+ _logger.info("Read Y from Q2");
expect("Y", testConsumer2.receive(1000));
+ _logger.info("Read Z from Q2");
expect("Z", testConsumer2.receive(1000));
+ _logger.info("create test session on Q1");
testConsumer1 = testSession.createConsumer(queue1);
+ _logger.info("Read null from Q1");
assertTrue(null == testConsumer1.receive(1000));
+ _logger.info("Read null from Q2");
assertTrue(null == testConsumer2.receive(1000));
}
public void testRollback() throws Exception
{
+ //add some messages
+ _logger.info("Send prep A");
+ prepProducer1.send(prepSession.createTextMessage("A"));
+ _logger.info("Send prep B");
+ prepProducer1.send(prepSession.createTextMessage("B"));
+ _logger.info("Send prep C");
+ prepProducer1.send(prepSession.createTextMessage("C"));
+
+ //Quick sleep to ensure all three get pre-fetched
+ Thread.sleep(500);
+
_logger.info("Sending X Y Z");
producer2.send(session.createTextMessage("X"));
producer2.send(session.createTextMessage("Y"));
@@ -140,9 +196,9 @@ public class TransactedTest extends TestCase
_logger.info("Receiving A B C");
//ensure sent messages are not visible and received messages are requeued
- expect("A", consumer1.receive(1000));
- expect("B", consumer1.receive(1000));
- expect("C", consumer1.receive(1000));
+ expect("A", consumer1.receive(1000), true);
+ expect("B", consumer1.receive(1000), true);
+ expect("C", consumer1.receive(1000), true);
_logger.info("Starting new connection");
testCon.start();
@@ -152,20 +208,22 @@ public class TransactedTest extends TestCase
assertTrue(null == testConsumer2.receive(1000));
session.commit();
+
+ _logger.info("Testing we have no messages left after commit");
+ assertTrue(null == testConsumer1.receive(1000));
+ assertTrue(null == testConsumer2.receive(1000));
}
public void testResendsMsgsAfterSessionClose() throws Exception
{
AMQConnection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "test");
- Session consumerSession = con.createSession(true, Session.CLIENT_ACKNOWLEDGE);
+ Session consumerSession = con.createSession(true, Session.SESSION_TRANSACTED);
AMQQueue queue3 = new AMQQueue(consumerSession.getDefaultQueueExchangeName(), new AMQShortString("Q3"), false);
MessageConsumer consumer = consumerSession.createConsumer(queue3);
- //force synch to ensure the consumer has resulted in a bound queue
- ((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.DIRECT_EXCHANGE_NAME, ExchangeDefaults.DIRECT_EXCHANGE_CLASS);
AMQConnection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "test");
- Session producerSession = con2.createSession(true, Session.CLIENT_ACKNOWLEDGE);
+ Session producerSession = con2.createSession(true, Session.SESSION_TRANSACTED);
MessageProducer producer = producerSession.createProducer(queue3);
_logger.info("Sending four messages");
@@ -176,65 +234,77 @@ public class TransactedTest extends TestCase
producerSession.commit();
-
_logger.info("Starting connection");
con.start();
TextMessage tm = (TextMessage) consumer.receive();
+ assertNotNull(tm);
+ assertEquals("msg1", tm.getText());
- tm.acknowledge();
consumerSession.commit();
- _logger.info("Received and acknowledged first message");
+ _logger.info("Received and committed first message");
tm = (TextMessage) consumer.receive(1000);
assertNotNull(tm);
+ assertEquals("msg2", tm.getText());
+
tm = (TextMessage) consumer.receive(1000);
assertNotNull(tm);
+ assertEquals("msg3", tm.getText());
+
tm = (TextMessage) consumer.receive(1000);
assertNotNull(tm);
+ assertEquals("msg4", tm.getText());
+
_logger.info("Received all four messages. Closing connection with three outstanding messages");
consumerSession.close();
- consumerSession = con.createSession(true, Session.CLIENT_ACKNOWLEDGE);
+ consumerSession = con.createSession(true, Session.SESSION_TRANSACTED);
consumer = consumerSession.createConsumer(queue3);
// no ack for last three messages so when I call recover I expect to get three messages back
-
tm = (TextMessage) consumer.receive(3000);
assertNotNull(tm);
assertEquals("msg2", tm.getText());
+ assertTrue("Message is not redelivered", tm.getJMSRedelivered());
tm = (TextMessage) consumer.receive(3000);
assertNotNull(tm);
assertEquals("msg3", tm.getText());
+ assertTrue("Message is not redelivered", tm.getJMSRedelivered());
tm = (TextMessage) consumer.receive(3000);
assertNotNull(tm);
assertEquals("msg4", tm.getText());
+ assertTrue("Message is not redelivered", tm.getJMSRedelivered());
+
+ _logger.info("Received redelivery of three messages. Committing");
- _logger.info("Received redelivery of three messages. Acknowledging last message");
- tm.acknowledge();
consumerSession.commit();
- _logger.info("Calling acknowledge with no outstanding messages");
- // all acked so no messages to be delivered
+ _logger.info("Called commit");
- tm = (TextMessage) consumer.receiveNoWait();
+ tm = (TextMessage) consumer.receive(1000);
assertNull(tm);
+
_logger.info("No messages redelivered as is expected");
con.close();
con2.close();
-
}
-
private void expect(String text, Message msg) throws JMSException
{
+ expect(text, msg, false);
+ }
+
+ private void expect(String text, Message msg, boolean requeued) throws JMSException
+ {
assertNotNull("Message should not be null", msg);
assertTrue("Message should be a text message", msg instanceof TextMessage);
assertEquals("Message content does not match expected", text, ((TextMessage) msg).getText());
+ assertEquals("Message should " + (requeued ? "" : "not") + " be requeued", requeued, msg.getJMSRedelivered());
}
public static junit.framework.Test suite()
diff --git a/java/cluster/src/test/java/org/apache/qpid/server/cluster/SimpleClusterTest.java b/java/cluster/src/test/java/org/apache/qpid/server/cluster/SimpleClusterTest.java
index 70209cd2a3..86cde3cee7 100644
--- a/java/cluster/src/test/java/org/apache/qpid/server/cluster/SimpleClusterTest.java
+++ b/java/cluster/src/test/java/org/apache/qpid/server/cluster/SimpleClusterTest.java
@@ -37,7 +37,7 @@ public class SimpleClusterTest extends TestCase
AMQConnection con = new AMQConnection("localhost:9000", "guest", "guest", "test", "/test");
AMQSession session = (AMQSession) con.createSession(false, AMQSession.NO_ACKNOWLEDGE);
System.out.println("Session created");
- session.declareExchange(new AMQShortString("my_exchange"), new AMQShortString("direct"));
+ session.declareExchange(new AMQShortString("my_exchange"), new AMQShortString("direct"), true);
System.out.println("Exchange declared");
con.close();
System.out.println("Connection closed");
diff --git a/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java b/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java
index ed244396bf..379f7feb4f 100644
--- a/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java
+++ b/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java
@@ -100,7 +100,7 @@ public final class AMQConstant
public static final AMQConstant RESOURCE_ERROR = new AMQConstant(506, "resource error", true);
- public static final AMQConstant NOT_ALLOWED = new AMQConstant(507, "not allowed", true);
+ public static final AMQConstant NOT_ALLOWED = new AMQConstant(530, "not allowed", true);
public static final AMQConstant NOT_IMPLEMENTED = new AMQConstant(540, "not implemented", true);
diff --git a/java/common/src/main/java/org/apache/qpid/util/ConcurrentLinkedMessageQueueAtomicSize.java b/java/common/src/main/java/org/apache/qpid/util/ConcurrentLinkedMessageQueueAtomicSize.java
index cdf686b4cb..883d5018cd 100644
--- a/java/common/src/main/java/org/apache/qpid/util/ConcurrentLinkedMessageQueueAtomicSize.java
+++ b/java/common/src/main/java/org/apache/qpid/util/ConcurrentLinkedMessageQueueAtomicSize.java
@@ -41,6 +41,11 @@ public class ConcurrentLinkedMessageQueueAtomicSize<E> extends ConcurrentLinkedQ
return super.size() + _messageHeadSize.get();
}
+ public int headSize()
+ {
+ return _messageHeadSize.get();
+ }
+
@Override
public E poll()
{
@@ -50,10 +55,14 @@ public class ConcurrentLinkedMessageQueueAtomicSize<E> extends ConcurrentLinkedQ
}
else
{
- _logger.debug("Providing item from message head");
-
E e = _messageHead.poll();
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Providing item(" + e + ")from message head");
+ }
+
+
if (e != null)
{
_messageHeadSize.decrementAndGet();
@@ -159,8 +168,12 @@ public class ConcurrentLinkedMessageQueueAtomicSize<E> extends ConcurrentLinkedQ
}
else
{
- _logger.debug("Providing item from message head");
- return _messageHead.peek();
+ E o = _messageHead.peek();
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Peeking item (" + o + ") from message head");
+ }
+ return o;
}
}
@@ -186,7 +199,10 @@ public class ConcurrentLinkedMessageQueueAtomicSize<E> extends ConcurrentLinkedQ
public boolean pushHead(E o)
{
- _logger.debug("Adding item to head of queue");
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Adding item(" + o + ") to head of queue");
+ }
if (_messageHead.offer(o))
{
_messageHeadSize.incrementAndGet();
diff --git a/java/systests/src/main/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java b/java/systests/src/main/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java
index c8271f1549..87491ed3d3 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java
@@ -67,7 +67,8 @@ public class ReturnUnroutableMandatoryMessageTest extends TestCase implements Ex
MessageConsumer consumer = consumerSession.createConsumer(queue, AMQSession.DEFAULT_PREFETCH_LOW_MARK, AMQSession.DEFAULT_PREFETCH_HIGH_MARK, false, false, (String) null, ft);
//force synch to ensure the consumer has resulted in a bound queue
- ((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.HEADERS_EXCHANGE_NAME, ExchangeDefaults.HEADERS_EXCHANGE_CLASS);
+ //((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.HEADERS_EXCHANGE_NAME, ExchangeDefaults.HEADERS_EXCHANGE_CLASS);
+ // This is the default now
Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "/test");