summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2007-03-13 10:35:42 +0000
committerMartin Ritchie <ritchiem@apache.org>2007-03-13 10:35:42 +0000
commitc4f018f7c10d2169ced4c59e776844ee5cf52d33 (patch)
tree421d85cf41bf382bab618587298d9d6bef825bdc /java
parent685ad5615e73f02f76f69841162fb9aa126892d2 (diff)
downloadqpid-python-c4f018f7c10d2169ced4c59e776844ee5cf52d33.tar.gz
QPID-346,QPID-386,QPID-403, QPID-410 Rollback, Basic-Reject, QueueBrowser NO_ACK.
QPID-346 Message loss after rollback\recover QPID-386 Updated Transactional Tests to cover underlying AMQP/Qpid state. QPID-403 Implement Basic.Reject QPID-410 Queue Browsers should use not acknowledge messages. ------------------------------------- Broker TxAck - Added comment and fixed white space UnacknowledgedMessage - Added comment for messageDecrement AMQChannel - Added extra debugging. + Created a NonTransactionalContext for requeuing messages as using txContext will tie the requeue to any runing transaction. + Updated message reference counting. So it is in terms of queues don't increment when giving to client. BasicCancelMethodHandler - Added Debug log. BasicConsumeMethodHandler - Reverted to directly writes frames to the session, throwing ChannelException caused problems. Added Trace and debug logging. BasicRejectMethodHandler, ChannelCloseHandler, ConnectionCloseMethodHandler - Added Debug logging AMQPFastProtocolHandler - moved error log to before session.write AMQMessage - Added additional debug via debugIdentity() and comments AMQQueue - Decoupled reference counting from dequeue operation. ConcurrentSelectorDeliveryManager - Added comments and increased info in debug logging SubscriptionImpl - Disabled use of acks for browsers. For now put setDeliveredToConsumer back in the finally block. commented that I'm not sure this is correct as even an error writing to client will cause msg to be marked delivered to consumer. + On Close ensured that it is only called once. + Had problem where closing browser was causing two CancelOk frames to be sent back to client. RequiredDeliveryException - Added comment to explain incrementReference LocalTransactionalContext - Commented out incrementReference as it shouldn't be required here. NonTransactionalContext - Removed incrementReference on deliver + - Fixed bug where browsers - acks would cause messages to be discarded. new JIRA this needs tidied up. TxnBuffer - Added debug logging. Client ------ AMQQueueBrowser - Added comments AMQSession - Added comments and debug + Updated to cause closed consumer to reject messages rather than receive them. + Prevented NoConsumer's from rollingback and rejecting.. they simply clear their SyncQueue - JIRA to ensure clean state with rollback BasicMessageConsumer - Added trace level debuging on close calls + Forced noConsume-rs to use NO_ACK + added more logging Closeable - Updated to use isClosed rather than directly calling _closed.get() to aid in future work on ensuring multi threaded close still allows pending acks to be processed first. ChannelCloseOkMethodHandler - updated comment AMQProtocolSession - Update comments,whitespace TransportConnection - removed static block FlowControllingBlockingQueue - Added isEmpty() Method PropertyValueTest - Added VM Broker setup + Updated test to run once and 50 times to pull out delivery tag problems that were occuring. + Adjusted logging level to be more helpful. moved some info down to trace and debug. MessageRequeueTest - Moved QpidClientConnection its own file. + Fixed it so it actually runs more than one consumer, concurrently.Now 3 was 1. ConcurrentLinkedMessageQueueAtomicSize - Implemented iterator(). Added QueueBrowserTest to system tests to test QueueBrowsering. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@517638 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java114
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java4
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java4
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/BasicCancelMethodHandler.java9
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java42
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java18
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java7
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java11
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java3
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java30
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java3
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java12
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java109
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java22
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/txn/TxnBuffer.java12
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java1
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession.java106
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java130
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/Closeable.java15
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseOkMethodHandler.java2
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java60
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java33
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java4
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java71
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java314
-rw-r--r--java/client/src/test/java/org/apache/qpid/testutil/QpidClientConnection.java268
-rw-r--r--java/common/src/main/java/org/apache/qpid/util/ConcurrentLinkedMessageQueueAtomicSize.java31
-rw-r--r--java/systests/src/main/java/org/apache/qpid/test/VMTestCase.java117
-rw-r--r--java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserTest.java150
31 files changed, 1191 insertions, 515 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index 5dd6619cff..1ebe5fa0a2 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -319,6 +319,25 @@ public class AMQChannel
public void unsubscribeConsumer(AMQProtocolSession session, AMQShortString consumerTag) throws AMQException
{
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("Unacked Map Dump size:" + _unacknowledgedMessageMap.size());
+ _unacknowledgedMessageMap.visit(new UnacknowledgedMessageMap.Visitor()
+ {
+
+ public boolean callback(UnacknowledgedMessage message) throws AMQException
+ {
+ _log.debug(message);
+
+ return true;
+ }
+
+ public void visitComplete()
+ {
+ }
+ });
+ }
+
AMQQueue q = _consumerTag2QueueMap.remove(consumerTag);
if (q != null)
{
@@ -342,9 +361,23 @@ public class AMQChannel
private void unsubscribeAllConsumers(AMQProtocolSession session) throws AMQException
{
- _log.info("Unsubscribing all consumers on channel " + toString());
+ if (_log.isInfoEnabled())
+ {
+ if (!_consumerTag2QueueMap.isEmpty())
+ {
+ _log.info("Unsubscribing all consumers on channel " + toString());
+ }
+ else
+ {
+ _log.info("No consumers to unsubscribe on channel " + toString());
+ }
+ }
for (Map.Entry<AMQShortString, AMQQueue> me : _consumerTag2QueueMap.entrySet())
{
+ if (_log.isInfoEnabled())
+ {
+ _log.info("Unsubscribing consumer '" + me.getKey() + "' on channel " + toString());
+ }
me.getValue().unregisterProtocolSession(session, _channelId, me.getKey());
}
_consumerTag2QueueMap.clear();
@@ -369,7 +402,11 @@ public class AMQChannel
}
else
{
- _log.debug(debugIdentity() + " Adding unacked message(" + deliveryTag + ") with a queue(" + queue + "):" + message.debugIdentity());
+ if (_log.isDebugEnabled())
+ {
+ _log.debug(debugIdentity() + " Adding unacked message(" + message.toString() + " DT:" + deliveryTag +
+ ") with a queue(" + queue + ") for " + consumerTag);
+ }
}
}
@@ -395,25 +432,38 @@ public class AMQChannel
*/
public void requeue() throws AMQException
{
+ if (_log.isInfoEnabled())
+ {
+ _log.info("Requeuing for " + toString());
+ }
+
// we must create a new map since all the messages will get a new delivery tag when they are redelivered
Collection<UnacknowledgedMessage> messagesToBeDelivered = _unacknowledgedMessageMap.cancelAllMessages();
+ if (_log.isDebugEnabled())
+ {
+ _log.info("Requeuing " + messagesToBeDelivered.size() + " unacked messages.");
+ }
// Deliver these messages out of the transaction as their delivery was never
// part of the transaction only the receive.
- TransactionalContext deliveryContext;
- if (!(_txnContext instanceof NonTransactionalContext))
+ TransactionalContext deliveryContext = null;
+
+ if (!messagesToBeDelivered.isEmpty())
{
- if (_nonTransactedContext == null)
+ if (!(_txnContext instanceof NonTransactionalContext))
{
- _nonTransactedContext = new NonTransactionalContext(_messageStore, _storeContext, this,
- _returnMessages, _browsedAcks);
- }
+// if (_nonTransactedContext == null)
+ {
+ _nonTransactedContext = new NonTransactionalContext(_messageStore, _storeContext, this,
+ _returnMessages, _browsedAcks);
+ }
- deliveryContext = _nonTransactedContext;
- }
- else
- {
- deliveryContext = _txnContext;
+ deliveryContext = _nonTransactedContext;
+ }
+ else
+ {
+ deliveryContext = _txnContext;
+ }
}
@@ -421,6 +471,10 @@ public class AMQChannel
{
if (unacked.queue != null)
{
+ // Ensure message is released for redelivery
+ unacked.message.release();
+
+ // Mark message redelivered
unacked.message.setRedelivered(true);
// Deliver Message
@@ -459,7 +513,7 @@ public class AMQChannel
TransactionalContext deliveryContext;
if (!(_txnContext instanceof NonTransactionalContext))
{
- if (_nonTransactedContext == null)
+// if (_nonTransactedContext == null)
{
_nonTransactedContext = new NonTransactionalContext(_messageStore, _storeContext, this,
_returnMessages, _browsedAcks);
@@ -472,13 +526,12 @@ public class AMQChannel
deliveryContext = _txnContext;
}
-
if (unacked.queue != null)
{
//Redeliver the messages to the front of the queue
deliveryContext.deliver(unacked.message, unacked.queue, true);
-
- unacked.message.decrementReference(_storeContext);
+ //Deliver increments the message count but we have already deliverted this once so don't increment it again
+ // this was because deliver did an increment changed this.
}
else
{
@@ -489,7 +542,6 @@ public class AMQChannel
//
// deliveryContext.deliver(unacked.message, getDeadLetterQueue(), false);
//
-// unacked.message.decrementReference(_storeContext);
}
}
else
@@ -656,15 +708,16 @@ public class AMQChannel
}
sub.addToResendQueue(msg);
_unacknowledgedMessageMap.remove(message.deliveryTag);
- // Don't decrement as we are bypassing the normal deliver which increments
- // this is why there is a decrement on the Requeue as deliver will increment.
- // msg.decrementReference(_storeContext);
}
} // sync(sub.getSendLock)
}
else
{
- _log.info("DeliveredSubscription not recorded so just requeueing to prevent loss");
+
+ if (_log.isInfoEnabled())
+ {
+ _log.info("DeliveredSubscription not recorded so just requeueing(" + message.toString() + ")to prevent loss");
+ }
//move this message to requeue
msgToRequeue.add(message);
}
@@ -706,7 +759,6 @@ public class AMQChannel
deliveryContext.deliver(message.message, message.queue, true);
_unacknowledgedMessageMap.remove(message.deliveryTag);
- message.message.decrementReference(_storeContext);
}
}
@@ -760,8 +812,18 @@ public class AMQChannel
{
synchronized (_unacknowledgedMessageMap.getLock())
{
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("Unacked (PreAck) Size:" + _unacknowledgedMessageMap.size());
+ }
+
_unacknowledgedMessageMap.acknowledgeMessage(deliveryTag, multiple, _txnContext);
checkSuspension();
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("Unacked (PostAck) Size:" + _unacknowledgedMessageMap.size());
+ }
+
}
}
@@ -775,12 +837,6 @@ public class AMQChannel
return _unacknowledgedMessageMap;
}
- public void addUnacknowledgedBrowsedMessage(AMQMessage msg, long deliveryTag, AMQShortString consumerTag, AMQQueue queue)
- {
- _browsedAcks.add(deliveryTag);
- addUnacknowledgedMessage(msg, deliveryTag, consumerTag, queue);
- }
-
private void checkSuspension()
{
boolean suspend;
diff --git a/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java b/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java
index 820f0122f5..fb16267d97 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java
@@ -37,6 +37,10 @@ public abstract class RequiredDeliveryException extends AMQException
{
super(message);
_amqMessage = payload;
+ // Increment the reference as this message is in the routing phase
+ // and so will have the ref decremented as routing fails.
+ // we need to keep this message around so we can return it in the
+ // handler. So increment here.
payload.incrementReference();
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java b/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java
index c987c12154..aac9408247 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java
@@ -101,6 +101,8 @@ public class TxAck implements TxnOp
for (UnacknowledgedMessage msg : _unacked)
{
msg.restoreTransientMessageData();
+
+ //Message has been ack so discard it. This will dequeue and decrement the reference.
msg.discard(storeContext);
}
}
@@ -124,7 +126,7 @@ public class TxAck implements TxnOp
_map.remove(_unacked);
for (UnacknowledgedMessage msg : _unacked)
{
- msg.clearTransientMessageData();
+ msg.clearTransientMessageData();
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java b/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java
index 940b5b2bf1..b8c5e821f7 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java
@@ -39,7 +39,6 @@ public class UnacknowledgedMessage
this.message = message;
this.consumerTag = consumerTag;
this.deliveryTag = deliveryTag;
- message.incrementReference();
}
public String toString()
@@ -63,6 +62,7 @@ public class UnacknowledgedMessage
{
message.dequeue(storeContext, queue);
}
+ //if the queue is null then the message is waiting to be acked, but has been removed.
message.decrementReference(storeContext);
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicCancelMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicCancelMethodHandler.java
index 7d18043f5c..8bab96a11b 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicCancelMethodHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicCancelMethodHandler.java
@@ -29,9 +29,12 @@ import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
+import org.apache.log4j.Logger;
public class BasicCancelMethodHandler implements StateAwareMethodListener<BasicCancelBody>
{
+ private static final Logger _log = Logger.getLogger(BasicCancelMethodHandler.class);
+
private static final BasicCancelMethodHandler _instance = new BasicCancelMethodHandler();
public static BasicCancelMethodHandler getInstance()
@@ -55,6 +58,12 @@ public class BasicCancelMethodHandler implements StateAwareMethodListener<BasicC
throw body.getChannelNotFoundException(evt.getChannelId());
}
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("BasicCancel: for:" + body.consumerTag +
+ " nowait:" + body.nowait);
+ }
+
channel.unsubscribeConsumer(protocolSession, body.consumerTag);
if (!body.nowait)
{
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
index da61f2ffd5..56eae279dc 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
@@ -25,6 +25,8 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicConsumeBody;
import org.apache.qpid.framing.BasicConsumeOkBody;
+import org.apache.qpid.framing.ChannelCloseBody;
+import org.apache.qpid.framing.ConnectionCloseBody;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.server.AMQChannel;
@@ -67,12 +69,22 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic
}
else
{
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("BasicConsume: from '" + body.queue +
+ "' for:" + body.consumerTag +
+ " nowait:" + body.nowait +
+ " args:" + body.arguments);
+ }
AMQQueue queue = body.queue == null ? channel.getDefaultQueue() : vHost.getQueueRegistry().getQueue(body.queue);
if (queue == null)
{
- _log.info("No queue for '" + body.queue + "'");
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("No queue for '" + body.queue + "'");
+ }
if (body.queue != null)
{
String msg = "No such queue, '" + body.queue + "'";
@@ -105,14 +117,34 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic
}
catch (org.apache.qpid.AMQInvalidArgumentException ise)
{
- _log.info("Closing connection due to invalid selector");
- throw body.getChannelException(AMQConstant.INVALID_ARGUMENT, ise.getMessage());
+ _log.debug("Closing connection due to invalid selector");
+ // Why doesn't this ChannelException work.
+// throw body.getChannelException(AMQConstant.INVALID_ARGUMENT, ise.getMessage());
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ session.writeFrame(ChannelCloseBody.createAMQFrame(channelId,
+ (byte) 8, (byte) 0, // AMQP version (major, minor)
+ BasicConsumeBody.getClazz((byte) 8, (byte) 0), // classId
+ BasicConsumeBody.getMethod((byte) 8, (byte) 0), // methodId
+ AMQConstant.INVALID_ARGUMENT.getCode(), // replyCode
+ new AMQShortString(ise.getMessage()))); // replyText
}
catch (ConsumerTagNotUniqueException e)
{
AMQShortString msg = new AMQShortString("Non-unique consumer tag, '" + body.consumerTag + "'");
- throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
- "Non-unique consumer tag, '" + body.consumerTag + "'");
+ // If the above doesn't work then perhaps this is wrong too.
+// throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
+// "Non-unique consumer tag, '" + body.consumerTag + "'");
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ session.writeFrame(ConnectionCloseBody.createAMQFrame(channelId,
+ (byte)8, (byte)0, // AMQP version (major, minor)
+ BasicConsumeBody.getClazz((byte)8, (byte)0), // classId
+ BasicConsumeBody.getMethod((byte)8, (byte)0), // methodId
+ AMQConstant.NOT_ALLOWED.getCode(), // replyCode
+ msg)); // replyText
}
catch (AMQQueue.ExistingExclusiveSubscription e)
{
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java
index 4e77a5e8b9..14687c40ae 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java
@@ -52,13 +52,13 @@ public class BasicRejectMethodHandler implements StateAwareMethodListener<BasicR
int channelId = evt.getChannelId();
- if (_logger.isTraceEnabled())
- {
- _logger.trace("Rejecting:" + evt.getMethod().deliveryTag +
- ": Requeue:" + evt.getMethod().requeue +
-// ": Resend:" + evt.getMethod().resend +
- " on channel:" + channelId);
- }
+// if (_logger.isDebugEnabled())
+// {
+// _logger.debug("Rejecting:" + evt.getMethod().deliveryTag +
+// ": Requeue:" + evt.getMethod().requeue +
+//// ": Resend:" + evt.getMethod().resend +
+// " on channel:" + channelId);
+// }
AMQChannel channel = session.getChannel(channelId);
@@ -67,9 +67,9 @@ public class BasicRejectMethodHandler implements StateAwareMethodListener<BasicR
throw evt.getMethod().getChannelNotFoundException(channelId);
}
- if (_logger.isTraceEnabled())
+ if (_logger.isDebugEnabled())
{
- _logger.trace("Rejecting:" + evt.getMethod().deliveryTag +
+ _logger.debug("Rejecting:" + evt.getMethod().deliveryTag +
": Requeue:" + evt.getMethod().requeue +
// ": Resend:" + evt.getMethod().resend +
" on channel:" + channel.debugIdentity());
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java
index 777784ca30..1f4f1f9221 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java
@@ -51,8 +51,11 @@ public class ChannelCloseHandler implements StateAwareMethodListener<ChannelClos
{
AMQProtocolSession session = stateManager.getProtocolSession();
ChannelCloseBody body = evt.getMethod();
- _logger.info("Received channel close for id " + evt.getChannelId() + " citing class " + body.classId +
- " and method " + body.methodId);
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info("Received channel close for id " + evt.getChannelId() + " citing class " + body.classId +
+ " and method " + body.methodId);
+ }
int channelId = evt.getChannelId();
AMQChannel channel = session.getChannel(channelId);
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java
index 21da03d226..b086cad67f 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java
@@ -30,7 +30,7 @@ import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
-public class ConnectionCloseMethodHandler implements StateAwareMethodListener<ConnectionCloseBody>
+public class ConnectionCloseMethodHandler implements StateAwareMethodListener<ConnectionCloseBody>
{
private static final Logger _logger = Logger.getLogger(ConnectionCloseMethodHandler.class);
@@ -49,8 +49,11 @@ public class ConnectionCloseMethodHandler implements StateAwareMethodListener<C
{
AMQProtocolSession session = stateManager.getProtocolSession();
final ConnectionCloseBody body = evt.getMethod();
- _logger.info("ConnectionClose received with reply code/reply text " + body.replyCode + "/" +
- body.replyText + " for " + session);
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info("ConnectionClose received with reply code/reply text " + body.replyCode + "/" +
+ body.replyText + " for " + session);
+ }
try
{
session.closeSession();
@@ -62,7 +65,7 @@ public class ConnectionCloseMethodHandler implements StateAwareMethodListener<C
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
- final AMQFrame response = ConnectionCloseOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0);
+ final AMQFrame response = ConnectionCloseOkBody.createAMQFrame(evt.getChannelId(), (byte) 8, (byte) 0);
session.writeFrame(response);
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
index 03c7051aac..d8b7814d31 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
@@ -176,6 +176,8 @@ public class AMQPFastProtocolHandler extends IoHandlerAdapter
}
else
{
+ _logger.error("Exception caught in" + session + ", closing session explictly: " + throwable, throwable);
+
// Be aware of possible changes to parameter order as versions change.
protocolSession.write(ConnectionCloseBody.createAMQFrame(0,
session.getProtocolMajorVersion(),
@@ -185,7 +187,6 @@ public class AMQPFastProtocolHandler extends IoHandlerAdapter
200, // replyCode
new AMQShortString(throwable.getMessage()) // replyText
));
- _logger.error("Exception caught in" + session + ", closing session explictly: " + throwable, throwable);
protocolSession.close();
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
index 6d375c89fe..cdf316f2d7 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
@@ -45,9 +45,7 @@ import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
-/**
- * Combines the information that make up a deliverable message into a more manageable form.
- */
+/** Combines the information that make up a deliverable message into a more manageable form. */
public class AMQMessage
{
private static final Logger _log = Logger.getLogger(AMQMessage.class);
@@ -92,9 +90,10 @@ public class AMQMessage
return _taken.get();
}
+ private final int hashcode = System.identityHashCode(this);
public String debugIdentity()
{
- return "(HC:" + System.identityHashCode(this) + " ID:" + _messageId + ")";
+ return "(HC:" + hashcode + " ID:" + _messageId + " Ref:" + _referenceCount.get() + ")";
}
/**
@@ -206,7 +205,7 @@ public class AMQMessage
_taken = new AtomicBoolean(false);
if (_log.isDebugEnabled())
{
- _log.debug("Message(" + System.identityHashCode(this) + ") created with id " + messageId);
+ _log.debug("Message(" + System.identityHashCode(this) + ") created (" + debugIdentity()+")");
}
}
@@ -363,7 +362,7 @@ public class AMQMessage
if (_log.isDebugEnabled())
{
- _log.debug("Ref count on message " + _messageId + " incremented to " + _referenceCount + " " + Arrays.asList(Thread.currentThread().getStackTrace()).subList(0, 4));
+ _log.debug("Ref count on message " + debugIdentity() + " incremented " + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 6));
}
}
@@ -374,6 +373,7 @@ public class AMQMessage
*
* @throws MessageCleanupException when an attempt was made to remove the message from the message store and that
* failed
+ * @param storeContext
*/
public void decrementReference(StoreContext storeContext) throws MessageCleanupException
{
@@ -387,9 +387,7 @@ public class AMQMessage
{
if (_log.isDebugEnabled())
{
- _log.debug("Ref count on message " + _messageId + " is zero; removing message" + Arrays.asList(Thread.currentThread().getStackTrace()).subList(0, 4));
-
-
+ _log.debug("Decremented ref count on message " + debugIdentity() + " is zero; removing message" + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 6));
}
// must check if the handle is null since there may be cases where we decide to throw away a message
@@ -410,7 +408,7 @@ public class AMQMessage
{
if (_log.isDebugEnabled())
{
- _log.debug("Ref count is now " + _referenceCount + " for message id " + _messageId + "\n" + Arrays.asList(Thread.currentThread().getStackTrace()).subList(0, 4));
+ _log.debug("Decremented ref count is now " + _referenceCount + " for message id " + debugIdentity() + "\n" + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 5));
if (_referenceCount.get() < 0)
{
Thread.dumpStack();
@@ -418,7 +416,7 @@ public class AMQMessage
}
if (_referenceCount.get() < 0)
{
- throw new MessageCleanupException("Reference count for message id " + _messageId + " has gone below 0.");
+ throw new MessageCleanupException("Reference count for message id " + debugIdentity() + " has gone below 0.");
}
}
}
@@ -459,7 +457,10 @@ public class AMQMessage
public void release()
{
- _log.trace("Releasing Message:" + debugIdentity());
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("Releasing Message:" + debugIdentity());
+ }
_taken.set(false);
_takenBySubcription = null;
}
@@ -572,7 +573,7 @@ public class AMQMessage
List<AMQQueue> destinationQueues = _transientMessageData.getDestinationQueues();
if (_log.isDebugEnabled())
{
- _log.debug("Delivering message " + _messageId + " to " + destinationQueues);
+ _log.debug("Delivering message " + debugIdentity() + " to " + destinationQueues);
}
try
{
@@ -589,6 +590,8 @@ public class AMQMessage
for (AMQQueue q : destinationQueues)
{
+ //Increment the references to this message for each queue delivery.
+ incrementReference();
//normal deliver so add this message at the end.
_txnContext.deliver(this, q, false);
}
@@ -596,6 +599,7 @@ public class AMQMessage
finally
{
destinationQueues.clear();
+ // Remove refence for routing process . Reference count should now == delivered queue count
decrementReference(storeContext);
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
index 7c2fe73386..78f144703b 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
@@ -516,7 +516,7 @@ public class AMQQueue implements Managable, Comparable
{
if (_logger.isInfoEnabled())
{
- _logger.warn("Auto-deleteing queue:" + this);
+ _logger.info("Auto-deleteing queue:" + this);
}
autodelete();
// we need to manually fire the event to the removed subscription (which was the last one left for this
@@ -624,7 +624,6 @@ public class AMQQueue implements Managable, Comparable
try
{
msg.dequeue(storeContext, this);
- msg.decrementReference(storeContext);
}
catch (MessageCleanupException e)
{
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
index 87868f0b25..6122d191f8 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
@@ -383,6 +383,9 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
return count;
}
+ /**
+ This can only be used to clear the _messages queue. Any subscriber resend queue will not be purged.
+ */
private AMQMessage getNextMessage() throws AMQException
{
return getNextMessage(_messages, null);
@@ -392,13 +395,14 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
{
AMQMessage message = messages.peek();
- while (message != null && ((sub != null && sub.isBrowser()) || message.taken(sub)))
+ //while (we have a message) && (The subscriber is not a browser or we are clearing) && (Check message is taken.)
+ while (message != null && (sub != null && !sub.isBrowser() || sub == null) && message.taken(sub))
{
//remove the already taken message
AMQMessage removed = messages.poll();
assert removed == message;
-
+
_totalMessageSize.addAndGet(-message.getSize());
if (_log.isTraceEnabled())
@@ -494,7 +498,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
_extraMessages.decrementAndGet();
}
- else if (messageQueue == sub.getPreDeliveryQueue())
+ else if (messageQueue == sub.getPreDeliveryQueue() && !sub.isBrowser())
{
if (_log.isInfoEnabled())
{
@@ -695,7 +699,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
{
if (_log.isDebugEnabled())
{
- _log.debug(debugIdentity() + " Message(" + msg.debugIdentity() +
+ _log.debug(debugIdentity() + " Message(" + msg.toString() +
") has been taken so disregarding deliver request to Subscriber:" +
System.identityHashCode(s));
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
index 20033daac7..d3578d39e8 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
@@ -256,10 +256,10 @@ public class SubscriptionImpl implements Subscription
// We don't need to add the message to the unacknowledgedMap as we don't need to know if the client
// received the message. If it is lost in transit that is not important.
- if (_acks)
- {
- channel.addUnacknowledgedBrowsedMessage(msg, deliveryTag, consumerTag, queue);
- }
+// if (_acks)
+// {
+// channel.addUnacknowledgedBrowsedMessage(msg, deliveryTag, consumerTag, queue);
+// }
if (_sendLock.get())
{
@@ -273,41 +273,49 @@ public class SubscriptionImpl implements Subscription
private void sendToConsumer(StoreContext storeContext, AMQMessage msg, AMQQueue queue)
throws AMQException
{
- // if we do not need to wait for client acknowledgements
- // we can decrement the reference count immediately.
+ try
+ { // if we do not need to wait for client acknowledgements
+ // we can decrement the reference count immediately.
- // By doing this _before_ the send we ensure that it
- // doesn't get sent if it can't be dequeued, preventing
- // duplicate delivery on recovery.
+ // By doing this _before_ the send we ensure that it
+ // doesn't get sent if it can't be dequeued, preventing
+ // duplicate delivery on recovery.
- // The send may of course still fail, in which case, as
- // the message is unacked, it will be lost.
- if (!_acks)
- {
- if (_logger.isDebugEnabled())
+ // The send may of course still fail, in which case, as
+ // the message is unacked, it will be lost.
+ if (!_acks)
{
- _logger.debug("No ack mode so dequeuing message immediately: " + msg.getMessageId());
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("No ack mode so dequeuing message immediately: " + msg.getMessageId());
+ }
+ queue.dequeue(storeContext, msg);
}
- queue.dequeue(storeContext, msg);
- }
- synchronized (channel)
- {
- long deliveryTag = channel.getNextDeliveryTag();
- if (_sendLock.get())
+ synchronized (channel)
{
- _logger.error("Sending " + msg + " when subscriber(" + this + ") is closed!");
- }
+ long deliveryTag = channel.getNextDeliveryTag();
- if (_acks)
- {
- channel.addUnacknowledgedMessage(msg, deliveryTag, consumerTag, queue);
- msg.decrementReference(storeContext);
- }
+ if (_sendLock.get())
+ {
+ _logger.error("Sending " + msg + " when subscriber(" + this + ") is closed!");
+ }
- protocolSession.getProtocolOutputConverter().writeDeliver(msg, channel.getChannelId(), deliveryTag, consumerTag);
+ if (_acks)
+ {
+ channel.addUnacknowledgedMessage(msg, deliveryTag, consumerTag, queue);
+ }
+
+ protocolSession.getProtocolOutputConverter().writeDeliver(msg, channel.getChannelId(), deliveryTag, consumerTag);
+
+ }
+ }
+ finally
+ {
//Only set delivered if it actually was writen successfully..
- // using a try->finally would set it even if an error occured.
+ // using a try->finally would set it even if an error occured.
+ // Is this what we want?
+
msg.setDeliveredToConsumer();
}
}
@@ -461,14 +469,25 @@ public class SubscriptionImpl implements Subscription
public void close()
{
+ boolean closed = false;
synchronized (_sendLock)
{
if (_logger.isDebugEnabled())
{
- _logger.debug("Setting SendLock true");
+ _logger.debug("Setting SendLock true:" + debugIdentity());
+ }
+
+ closed = _sendLock.getAndSet(true);
+ }
+
+ if (closed)
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Called close() on a closed subscription");
}
- _sendLock.set(true);
+ return;
}
if (_logger.isInfoEnabled())
@@ -488,16 +507,36 @@ public class SubscriptionImpl implements Subscription
//remove references in PDQ
if (_messages != null)
{
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info("Clearing PDQ (" + debugIdentity() + "):" + this);
+ }
+
_messages.clear();
}
+ }
+
+ private void autoclose()
+ {
+ close();
if (_autoClose && !_sentClose)
{
- _logger.info("Closing autoclose subscription:" + this);
+ _logger.info("Closing autoclose subscription (" + debugIdentity() + "):" + this);
+
ProtocolOutputConverter converter = protocolSession.getProtocolOutputConverter();
converter.confirmConsumerAutoClose(channel.getChannelId(), consumerTag);
-
_sentClose = true;
+
+ //fixme JIRA do this better
+ try
+ {
+ channel.unsubscribeConsumer(protocolSession, consumerTag);
+ }
+ catch (AMQException e)
+ {
+ // Occurs if we cannot find the subscriber in the channel with protocolSession and consumerTag.
+ }
}
}
@@ -590,7 +629,7 @@ public class SubscriptionImpl implements Subscription
{
if (_messages.isEmpty())
{
- close();
+ autoclose();
return null;
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java b/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java
index e5cce672f6..cf0da55f2a 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java
@@ -100,7 +100,7 @@ public class LocalTransactionalContext implements TransactionalContext
// be added for every queue onto which the message is
// enqueued. Finally a cleanup op will be added to decrement
// the reference associated with the routing.
- message.incrementReference();
+// message.incrementReference();
_postCommitDeliveryList.add(new DeliveryDetails(message, queue, deliverFirst));
_messageDelivered = true;
/*_txnBuffer.enlist(new DeliverMessageOperation(message, queue));
diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java b/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
index 19146da22e..181dfa3a80 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
@@ -93,7 +93,6 @@ public class NonTransactionalContext implements TransactionalContext
{
try
{
- message.incrementReference();
queue.process(_storeContext, message, deliverFirst);
//following check implements the functionality
//required by the 'immediate' flag:
@@ -128,6 +127,8 @@ public class NonTransactionalContext implements TransactionalContext
{
_log.debug("Discarding message: " + message.message.getMessageId());
}
+
+ //Message has been ack so discard it. This will dequeue and decrement the reference.
message.discard(_storeContext);
}
else
@@ -160,6 +161,8 @@ public class NonTransactionalContext implements TransactionalContext
{
_log.debug("Discarding message: " + msg.message.getMessageId());
}
+
+ //Message has been ack so discard it. This will dequeue and decrement the reference.
msg.discard(_storeContext);
}
else
@@ -181,7 +184,22 @@ public class NonTransactionalContext implements TransactionalContext
throw new AMQException("Single ack on delivery tag " + deliveryTag + " not known for channel:" +
_channel.getChannelId());
}
- msg.discard(_storeContext);
+
+ if (!_browsedAcks.contains(deliveryTag))
+ {
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("Discarding message: " + msg.message.getMessageId());
+ }
+
+ //Message has been ack so discard it. This will dequeue and decrement the reference.
+ msg.discard(_storeContext);
+ }
+ else
+ {
+ _browsedAcks.remove(deliveryTag);
+ }
+
if (_log.isDebugEnabled())
{
_log.debug("Received non-multiple ack for messaging with delivery tag " + deliveryTag + " msg id " +
diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/TxnBuffer.java b/java/broker/src/main/java/org/apache/qpid/server/txn/TxnBuffer.java
index d04b93a469..339ca8ae1a 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/txn/TxnBuffer.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/txn/TxnBuffer.java
@@ -27,10 +27,7 @@ import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.server.store.StoreContext;
-/**
- * Holds a list of TxnOp instance representing transactional
- * operations.
- */
+/** Holds a list of TxnOp instance representing transactional operations. */
public class TxnBuffer
{
private final List<TxnOp> _ops = new ArrayList<TxnOp>();
@@ -42,6 +39,11 @@ public class TxnBuffer
public void commit(StoreContext context) throws AMQException
{
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("Committing " + _ops.size() + " ops to commit.:" + _ops.toArray());
+ }
+
if (prepare(context))
{
for (TxnOp op : _ops)
@@ -64,7 +66,7 @@ public class TxnBuffer
catch (Exception e)
{
//compensate previously prepared ops
- for(int j = 0; j < i; j++)
+ for (int j = 0; j < i; j++)
{
_ops.get(j).undoPrepare();
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java b/java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java
index 053d380129..4662f80c5b 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java
@@ -49,6 +49,7 @@ public class AMQQueueBrowser implements QueueBrowser
_session = session;
_queue = queue;
_messageSelector = (messageSelector == null) || (messageSelector.trim().length() == 0) ? null : messageSelector;
+ // Create Consumer to verify message selector.
BasicMessageConsumer consumer = (BasicMessageConsumer) _session.createBrowserConsumer(_queue, _messageSelector, false);
consumer.close();
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index 61143eee69..184bc44912 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -25,6 +25,7 @@ import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Map;
+import java.util.Arrays;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -294,8 +295,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
if (_dispatcherLogger.isDebugEnabled())
{
- _dispatcherLogger.debug("Dispatcher Connection " + (connectionStopped ? "Started" : "Stopped") +
- ": Currently " + (currently ? "Started" : "Stopped"));
+ _dispatcherLogger.debug("Set Dispatcher Connection " + (connectionStopped ? "Stopped" : "Started") +
+ ": Currently " + (currently ? "Stopped" : "Started"));
}
}
return currently;
@@ -307,22 +308,31 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
{
final BasicMessageConsumer consumer = (BasicMessageConsumer) _consumers.get(message.getDeliverBody().consumerTag);
- if (consumer == null)
+ if (consumer == null || consumer.isClosed())
{
if (_dispatcherLogger.isInfoEnabled())
{
- _dispatcherLogger.info("Received a message(" + System.identityHashCode(message) + ")" +
- "[" + message.getDeliverBody().deliveryTag + "] from queue "
- + message.getDeliverBody().consumerTag + " without a handler - rejecting(requeue)...");
+ if (consumer == null)
+ {
+ _dispatcherLogger.info("Received a message(" + System.identityHashCode(message) + ")" +
+ "[" + message.getDeliverBody().deliveryTag + "] from queue " +
+ message.getDeliverBody().consumerTag +
+ " )without a handler - rejecting(requeue)...");
+ }
+ else
+ {
+ _dispatcherLogger.info("Received a message(" + System.identityHashCode(message) + ")" +
+ "[" + message.getDeliverBody().deliveryTag + "] from queue " +
+ " consumer(" + consumer.debugIdentity() +
+ ") is closed rejecting(requeue)...");
+ }
}
rejectMessage(message, true);
}
else
{
-
consumer.notifyMessage(message, _channelId);
-
}
}
}
@@ -354,7 +364,18 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
for (BasicMessageConsumer consumer : _consumers.values())
{
- consumer.rollback();
+ if (!consumer.isNoConsume())
+ {
+ consumer.rollback();
+ }
+ else
+ {
+ // should perhaps clear the _SQ here.
+ //consumer._synchronousQueue.clear();
+ consumer.clearReceiveQueue();
+ }
+
+
}
setConnectionStopped(isStopped);
@@ -379,8 +400,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
// Reject messages on pre-dispatch queue
rejectMessagesForConsumerTag(consumer.getConsumerTag(), true);
- // Remove consumer from map.
- deregisterConsumer(consumer);
+ // closeConsumer
+ consumer.markClosed();
_dispatcher.setConnectionStopped(stopped);
@@ -624,6 +645,11 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
public void close(long timeout) throws JMSException
{
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info("Closing session: " + this + ":" + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 6));
+ }
+
// We must close down all producers and consumers in an orderly fashion. This is the only method
// that can be called from a different thread of control from the one controlling the session
synchronized (_connection.getFailoverMutex())
@@ -2063,26 +2089,39 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
// Remove the consumer from the map
BasicMessageConsumer consumer = (BasicMessageConsumer) _consumers.get(consumerTag);
if (consumer != null)
- {
- if (consumer.isAutoClose())
+ {
+// fixme this isn't right.. needs to check if _queue contains data for this consumer
+ if (consumer.isAutoClose())// && _queue.isEmpty())
{
consumer.closeWhenNoMessages(true);
}
- //Clean the Maps up first
- //Flush any pending messages for this consumerTag
- if (_dispatcher != null)
+ if (!consumer.isNoConsume())
{
- _logger.info("Dispatcher is not null");
+ //Clean the Maps up first
+ //Flush any pending messages for this consumerTag
+ if (_dispatcher != null)
+ {
+ _logger.info("Dispatcher is not null");
+ }
+ else
+ {
+ _logger.info("Dispatcher is null so created stopped dispatcher");
+
+ startDistpatcherIfNecessary(true);
+ }
+
+ _dispatcher.rejectPending(consumer);
}
else
{
- _logger.info("Dispatcher is null so created stopped dispatcher");
+ //Just close the consumer
+ //fixme the CancelOK is being processed before the arriving messages..
+ // The dispatcher is still to process them so the server sent in order but the client
+ // has yet to receive before the close comes in.
- startDistpatcherIfNecessary(true);
+// consumer.markClosed();
}
-
- _dispatcher.rejectPending(consumer);
}
else
{
@@ -2217,7 +2256,20 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
private void rejectMessagesForConsumerTag(AMQShortString consumerTag, boolean requeue)
{
Iterator messages = _queue.iterator();
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info("Rejecting messages from _queue for Consumer tag(" + consumerTag +
+ ") (PDispatchQ) requeue:" + requeue);
+ if (messages.hasNext())
+ {
+ _logger.info("Checking all messages in _queue for Consumer tag(" + consumerTag + ")");
+ }
+ else
+ {
+ _logger.info("No messages in _queue to reject");
+ }
+ }
while (messages.hasNext())
{
UnprocessedMessage message = (UnprocessedMessage) messages.next();
@@ -2239,10 +2291,6 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
_logger.debug("Rejected the message(" + message.getDeliverBody() + ") for consumer :" + consumerTag);
}
}
- else
- {
- _logger.error("Pruned pending message for consumer:" + consumerTag);
- }
}
}
@@ -2250,9 +2298,9 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
public void rejectMessage(UnprocessedMessage message, boolean requeue)
{
- if (_logger.isDebugEnabled())
+ if (_logger.isTraceEnabled())
{
- _logger.debug("Rejecting Unacked message:" + message.getDeliverBody().deliveryTag);
+ _logger.trace("Rejecting Unacked message:" + message.getDeliverBody().deliveryTag);
}
rejectMessage(message.getDeliverBody().deliveryTag, requeue);
@@ -2260,9 +2308,9 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
public void rejectMessage(AbstractJMSMessage message, boolean requeue)
{
- if (_logger.isDebugEnabled())
+ if (_logger.isTraceEnabled())
{
- _logger.debug("Rejecting Abstract message:" + message.getDeliveryTag());
+ _logger.trace("Rejecting Abstract message:" + message.getDeliveryTag());
}
rejectMessage(message.getDeliveryTag(), requeue);
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
index 9043faa80c..73010ce517 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
@@ -22,6 +22,7 @@ package org.apache.qpid.client;
import java.util.Iterator;
import java.util.List;
+import java.util.Arrays;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
@@ -118,7 +119,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
private ConcurrentLinkedQueue<Long> _unacknowledgedDeliveryTags = new ConcurrentLinkedQueue<Long>();
- /** List of tags delievered, The last of which which should be acknowledged on commit in transaction mode. */
+ /** List of tags delievered, The last of which which should be acknowledged on commit in transaction mode. */
private ConcurrentLinkedQueue<Long> _receivedDeliveryTags = new ConcurrentLinkedQueue<Long>();
/**
@@ -135,6 +136,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
private boolean _closeWhenNoMessages;
private boolean _noConsume;
+ private List<StackTraceElement> _closedStack = null;
protected BasicMessageConsumer(int channelId, AMQConnection connection, AMQDestination destination, String messageSelector,
boolean noLocal, MessageFactoryRegistry messageFactory, AMQSession session,
@@ -157,6 +159,12 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
_synchronousQueue = new ArrayBlockingQueue(prefetchHigh, true);
_autoClose = autoClose;
_noConsume = noConsume;
+
+ //Force queue browsers not to use acknowledge modes.
+ if (_noConsume)
+ {
+ _acknowledgeMode = Session.NO_ACKNOWLEDGE;
+ }
}
public AMQDestination getDestination()
@@ -433,6 +441,8 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
public void close(boolean sendClose) throws JMSException
{
+ //synchronized (_closed)
+
if (_logger.isInfoEnabled())
{
_logger.info("Closing consumer:" + debugIdentity());
@@ -442,6 +452,18 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
{
if (!_closed.getAndSet(true))
{
+ if (_logger.isTraceEnabled())
+ {
+ if (_closedStack != null)
+ {
+ _logger.trace(_consumerTag + " close():" + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 6));
+ _logger.trace(_consumerTag + " previously:" + _closedStack.toString());
+ }
+ else
+ {
+ _closedStack = Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 6);
+ }
+ }
if (sendClose)
{
// TODO: Be aware of possible changes to parameter order as versions change.
@@ -467,9 +489,15 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
throw new JMSException("Error closing consumer: " + e);
}
}
+ else
+ {
+// //fixme this probably is not right
+// if (!isNoConsume())
+ { //done in BasicCancelOK Handler but not sending one so just deregister.
+ deregisterConsumer();
+ }
+ }
- //done in BasicCancelOK Handler
- //deregisterConsumer();
if (_messageListener != null && _receiving.get())
{
if (_logger.isInfoEnabled())
@@ -488,7 +516,23 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
*/
void markClosed()
{
- _closed.set(true);
+// synchronized (_closed)
+ {
+ _closed.set(true);
+
+ if (_logger.isTraceEnabled())
+ {
+ if (_closedStack != null)
+ {
+ _logger.trace(_consumerTag + " markClosed():" + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 8));
+ _logger.trace(_consumerTag + " previously:" + _closedStack.toString());
+ }
+ else
+ {
+ _closedStack = Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 8);
+ }
+ }
+ }
deregisterConsumer();
}
@@ -520,11 +564,24 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
{
_logger.debug("Message is of type: " + jmsMessage.getClass().getName());
}
- jmsMessage.setConsumer(this);
+// synchronized (_closed)
+ {
+// if (!_closed.get())
+ {
+
+ jmsMessage.setConsumer(this);
- preDeliver(jmsMessage);
+ preDeliver(jmsMessage);
- notifyMessage(jmsMessage, channelId);
+ notifyMessage(jmsMessage, channelId);
+ }
+// else
+// {
+// _logger.error("MESSAGE REJECTING!");
+// _session.rejectMessage(jmsMessage, true);
+// //_logger.error("MESSAGE JUST DROPPED!");
+// }
+ }
}
catch (Exception e)
{
@@ -551,9 +608,16 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
{
//we do not need a lock around the test above, and the dispatch below as it is invalid
//for an application to alter an installed listener while the session is started
- preApplicationProcessing(jmsMessage);
- getMessageListener().onMessage(jmsMessage);
- postDeliver(jmsMessage);
+// synchronized (_closed)
+ {
+// if (!_closed.get())
+ {
+
+ preApplicationProcessing(jmsMessage);
+ getMessageListener().onMessage(jmsMessage);
+ postDeliver(jmsMessage);
+ }
+ }
}
else
{
@@ -649,14 +713,30 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
lastDeliveryTag = _receivedDeliveryTags.poll();
}
+ assert _receivedDeliveryTags.isEmpty();
+
_session.acknowledgeMessage(lastDeliveryTag, true);
}
}
void notifyError(Throwable cause)
{
- _closed.set(true);
-
+// synchronized (_closed)
+ {
+ _closed.set(true);
+ if (_logger.isTraceEnabled())
+ {
+ if (_closedStack != null)
+ {
+ _logger.trace(_consumerTag + " notifyError():" + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 8));
+ _logger.trace(_consumerTag + " previously" + _closedStack.toString());
+ }
+ else
+ {
+ _closedStack = Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 8);
+ }
+ }
+ }
//QPID-293 can "request redelivery of this error through dispatcher"
// we have no way of propagating the exception to a message listener - a JMS limitation - so we
@@ -761,14 +841,20 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
{
clearUnackedMessages();
- if (_logger.isDebugEnabled())
+ if (!_receivedDeliveryTags.isEmpty())
{
- _logger.debug("Rejecting received messages");
+ _logger.debug("Rejecting received messages in _receivedDTs (RQ)");
}
//rollback received but not committed messages
while (!_receivedDeliveryTags.isEmpty())
{
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Rejecting the messages(" + _receivedDeliveryTags.size() + ") in _receivedDTs (RQ)" +
+ "for consumer with tag:" + _consumerTag);
+ }
+
Long tag = _receivedDeliveryTags.poll();
if (tag != null)
@@ -782,12 +868,20 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
}
}
+ if (!_receivedDeliveryTags.isEmpty())
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Queue _receivedDTs (RQ) was not empty after rejection");
+ }
+ }
+
//rollback pending messages
if (_synchronousQueue.size() > 0)
{
if (_logger.isDebugEnabled())
{
- _logger.debug("Rejecting the messages(" + _synchronousQueue.size() + ")" +
+ _logger.debug("Rejecting the messages(" + _synchronousQueue.size() + ") in _syncQueue (PRQ)" +
"for consumer with tag:" + _consumerTag);
}
Iterator iterator = _synchronousQueue.iterator();
@@ -821,7 +915,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
rollback();
}
- _synchronousQueue.clear();
+ clearReceiveQueue();
}
}
@@ -831,4 +925,8 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
return String.valueOf(_consumerTag);
}
+ public void clearReceiveQueue()
+ {
+ _synchronousQueue.clear();
+ }
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/Closeable.java b/java/client/src/main/java/org/apache/qpid/client/Closeable.java
index 6593f2c254..d246dc3931 100644
--- a/java/client/src/main/java/org/apache/qpid/client/Closeable.java
+++ b/java/client/src/main/java/org/apache/qpid/client/Closeable.java
@@ -25,20 +25,18 @@ import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.IllegalStateException;
import javax.jms.JMSException;
-/**
- * Provides support for orderly shutdown of an object.
- */
+/** Provides support for orderly shutdown of an object. */
public abstract class Closeable
{
/**
- * We use an atomic boolean so that we do not have to synchronized access to this flag. Synchronizing
- * access to this flag would mean have a synchronized block in every method.
+ * We use an atomic boolean so that we do not have to synchronized access to this flag. Synchronizing access to this
+ * flag would mean have a synchronized block in every method.
*/
protected final AtomicBoolean _closed = new AtomicBoolean(false);
protected void checkNotClosed() throws JMSException
{
- if (_closed.get())
+ if (isClosed())
{
throw new IllegalStateException("Object " + toString() + " has been closed");
}
@@ -46,7 +44,10 @@ public abstract class Closeable
public boolean isClosed()
{
- return _closed.get();
+// synchronized (_closed)
+ {
+ return _closed.get();
+ }
}
public abstract void close() throws JMSException;
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseOkMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseOkMethodHandler.java
index 794071cc34..0826deb2f4 100644
--- a/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseOkMethodHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseOkMethodHandler.java
@@ -42,6 +42,6 @@ public class ChannelCloseOkMethodHandler implements StateAwareMethodListener
{
_logger.info("Received channel-close-ok for channel-id " + evt.getChannelId());
- //todo this should do the closure
+ //todo this should do the local closure
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
index 19767b6575..e875b4dca8 100644
--- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
@@ -51,10 +51,8 @@ import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
import org.apache.qpid.protocol.AMQConstant;
/**
- * Wrapper for protocol session that provides type-safe access to session attributes.
- * <p/>
- * The underlying protocol session is still available but clients should not
- * use it to obtain session attributes.
+ * Wrapper for protocol session that provides type-safe access to session attributes. <p/> The underlying protocol
+ * session is still available but clients should not use it to obtain session attributes.
*/
public class AMQProtocolSession implements AMQVersionAwareProtocolSession
{
@@ -78,27 +76,23 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
protected WriteFuture _lastWriteFuture;
/**
- * The handler from which this session was created and which is used to handle protocol events.
- * We send failover events to the handler.
+ * The handler from which this session was created and which is used to handle protocol events. We send failover
+ * events to the handler.
*/
protected final AMQProtocolHandler _protocolHandler;
- /**
- * Maps from the channel id to the AMQSession that it represents.
- */
+ /** Maps from the channel id to the AMQSession that it represents. */
protected ConcurrentMap _channelId2SessionMap = new ConcurrentHashMap();
protected ConcurrentMap _closingChannels = new ConcurrentHashMap();
/**
- * Maps from a channel id to an unprocessed message. This is used to tie together the
- * JmsDeliverBody (which arrives first) with the subsequent content header and content bodies.
+ * Maps from a channel id to an unprocessed message. This is used to tie together the JmsDeliverBody (which arrives
+ * first) with the subsequent content header and content bodies.
*/
protected ConcurrentMap _channelId2UnprocessedMsgMap = new ConcurrentHashMap();
- /**
- * Counter to ensure unique queue names
- */
+ /** Counter to ensure unique queue names */
protected int _queueId = 1;
protected final Object _queueIdLock = new Object();
@@ -108,8 +102,8 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
/**
- * No-arg constructor for use by test subclass - has to initialise final vars
- * NOT intended for use other then for test
+ * No-arg constructor for use by test subclass - has to initialise final vars NOT intended for use other then for
+ * test
*/
public AMQProtocolSession()
{
@@ -147,7 +141,7 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
{
// start the process of setting up the connection. This is the first place that
// data is written to the server.
-
+
_minaProtocolSession.write(new ProtocolInitiation(ProtocolVersion.getLatestSupportedVersion()));
}
@@ -207,8 +201,7 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
/**
* Store the SASL client currently being used for the authentication handshake
*
- * @param client if non-null, stores this in the session. if null clears any existing client
- * being stored
+ * @param client if non-null, stores this in the session. if null clears any existing client being stored
*/
public void setSaslClient(SaslClient client)
{
@@ -237,10 +230,11 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
}
/**
- * Callback invoked from the BasicDeliverMethodHandler when a message has been received.
- * This is invoked on the MINA dispatcher thread.
+ * Callback invoked from the BasicDeliverMethodHandler when a message has been received. This is invoked on the MINA
+ * dispatcher thread.
*
* @param message
+ *
* @throws AMQException if this was not expected
*/
public void unprocessedMessageReceived(UnprocessedMessage message) throws AMQException
@@ -295,8 +289,7 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
}
/**
- * Deliver a message to the appropriate session, removing the unprocessed message
- * from our map
+ * Deliver a message to the appropriate session, removing the unprocessed message from our map
*
* @param channelId the channel id the message should be delivered to
* @param msg the message
@@ -309,8 +302,8 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
}
/**
- * Convenience method that writes a frame to the protocol session. Equivalent
- * to calling getProtocolSession().write().
+ * Convenience method that writes a frame to the protocol session. Equivalent to calling
+ * getProtocolSession().write().
*
* @param frame the frame to write
*/
@@ -377,15 +370,14 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
}
/**
- * Called from the ChannelClose handler when a channel close frame is received.
- * This method decides whether this is a response or an initiation. The latter
- * case causes the AMQSession to be closed and an exception to be thrown if
+ * Called from the ChannelClose handler when a channel close frame is received. This method decides whether this is
+ * a response or an initiation. The latter case causes the AMQSession to be closed and an exception to be thrown if
* appropriate.
*
* @param channelId the id of the channel (session)
- * @return true if the client must respond to the server, i.e. if the server
- * initiated the channel close, false if the channel close is just the server
- * responding to the client's earlier request to close the channel.
+ *
+ * @return true if the client must respond to the server, i.e. if the server initiated the channel close, false if
+ * the channel close is just the server responding to the client's earlier request to close the channel.
*/
public boolean channelClosed(int channelId, AMQConstant code, String text) throws AMQException
{
@@ -450,9 +442,7 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
return new AMQShortString("tmp_" + localAddress + "_" + id);
}
- /**
- * @param delay delay in seconds (not ms)
- */
+ /** @param delay delay in seconds (not ms) */
void initHeartbeats(int delay)
{
if (delay > 0)
@@ -475,7 +465,7 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
{
_protocolMajorVersion = versionMajor;
_protocolMinorVersion = versionMinor;
- _registry = MainRegistry.getVersionSpecificRegistry(versionMajor, versionMinor);
+ _registry = MainRegistry.getVersionSpecificRegistry(versionMajor, versionMinor);
}
public byte getProtocolMinorVersion()
diff --git a/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java b/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
index 69042d08ea..8368eee125 100644
--- a/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
+++ b/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
@@ -38,12 +38,10 @@ import org.apache.qpid.jms.BrokerDetails;
import org.apache.qpid.pool.ReadWriteThreadModel;
/**
- * The TransportConnection is a helper class responsible for connecting to an AMQ server. It sets up
- * the underlying connector, which currently always uses TCP/IP sockets. It creates the
- * "protocol handler" which deals with MINA protocol events.
- * <p/>
- * Could be extended in future to support different transport types by turning this into concrete class/interface
- * combo.
+ * The TransportConnection is a helper class responsible for connecting to an AMQ server. It sets up the underlying
+ * connector, which currently always uses TCP/IP sockets. It creates the "protocol handler" which deals with MINA
+ * protocol events. <p/> Could be extended in future to support different transport types by turning this into concrete
+ * class/interface combo.
*/
public class TransportConnection
{
@@ -61,22 +59,6 @@ public class TransportConnection
private static final String DEFAULT_QPID_SERVER = "org.apache.qpid.server.protocol.AMQPFastProtocolHandler";
- static
- {
- _acceptor = new VmPipeAcceptor();
-
- IoServiceConfig config = _acceptor.getDefaultConfig();
-
- config.setThreadModel(ReadWriteThreadModel.getInstance());
- }
-
- public static ITransportConnection getInstance() throws AMQTransportConnectionException
- {
- AMQBrokerDetails details = new AMQBrokerDetails();
- details.setTransport(BrokerDetails.TCP);
- return getInstance(details);
- }
-
public static ITransportConnection getInstance(BrokerDetails details) throws AMQTransportConnectionException
{
int transport = getTransport(details.getTransport());
@@ -182,7 +164,14 @@ public class TransportConnection
public static void createVMBroker(int port) throws AMQVMBrokerCreationException
{
+ if (_acceptor == null)
+ {
+ _acceptor = new VmPipeAcceptor();
+ IoServiceConfig config = _acceptor.getDefaultConfig();
+
+ config.setThreadModel(ReadWriteThreadModel.getInstance());
+ }
if (!_inVmPipeAddress.containsKey(port))
{
diff --git a/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java b/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java
index cb4ef01d25..642b928d81 100644
--- a/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java
+++ b/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java
@@ -44,6 +44,10 @@ public class FlowControllingBlockingQueue
/** We require a separate count so we can track whether we have reached the threshold */
private int _count;
+ public boolean isEmpty()
+ {
+ return _queue.isEmpty();
+ }
public interface ThresholdListener
{
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java
index fe15fa5155..1e50a62fee 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java
@@ -39,8 +39,8 @@ import org.apache.log4j.Logger;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQQueue;
import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.transport.TransportConnection;
import org.apache.qpid.client.message.JMSTextMessage;
-import org.apache.qpid.testutil.VMBrokerSetup;
public class PropertyValueTest extends TestCase implements MessageListener
{
@@ -59,19 +59,13 @@ public class PropertyValueTest extends TestCase implements MessageListener
protected void setUp() throws Exception
{
super.setUp();
- try
- {
- init(new AMQConnection(_connectionString, "guest", "guest", randomize("Client"), "test"));
- }
- catch (Exception e)
- {
- fail("Unable to initialilse connection: " + e);
- }
+ TransportConnection.createVMBroker(1);
}
protected void tearDown() throws Exception
{
super.tearDown();
+ TransportConnection.killVMBroker(1);
}
private void init(AMQConnection connection) throws Exception
@@ -91,14 +85,48 @@ public class PropertyValueTest extends TestCase implements MessageListener
connection.start();
}
- public void test() throws Exception
+ public void testOnce()
{
- int count = _count;
- send(count);
- waitFor(count);
- check();
- _logger.info("Completed without failure");
- _connection.close();
+ runBatch(1);
+ }
+
+ public void test50()
+ {
+ runBatch(50);
+ }
+
+ private void runBatch(int runSize)
+ {
+ try
+ {
+ int run = 0;
+ while (run < runSize)
+ {
+ _logger.error("Run Number:" + run++);
+ try
+ {
+ init(new AMQConnection(_connectionString, "guest", "guest", randomize("Client"), "test"));
+ }
+ catch (Exception e)
+ {
+ fail("Unable to initialilse connection: " + e);
+ }
+
+ int count = _count;
+ send(count);
+ waitFor(count);
+ check();
+ _logger.info("Completed without failure");
+ _connection.close();
+
+ _logger.error("End Run Number:" + (run - 1));
+ }
+ }
+ catch (Exception e)
+ {
+ _logger.error(e.getMessage(), e);
+ e.printStackTrace();
+ }
}
void send(int count) throws JMSException
@@ -138,7 +166,7 @@ public class PropertyValueTest extends TestCase implements MessageListener
m.setJMSReplyTo(q);
m.setStringProperty("TempQueue", q.toString());
- _logger.info("Message:" + m);
+ _logger.trace("Message:" + m);
Assert.assertEquals("Check temp queue has been set correctly",
m.getJMSReplyTo().toString(), m.getStringProperty("TempQueue"));
@@ -150,7 +178,7 @@ public class PropertyValueTest extends TestCase implements MessageListener
m.setShortProperty("Short", (short) Short.MAX_VALUE);
m.setStringProperty("String", "Test");
- _logger.info("Sending Msg:" + m);
+ _logger.debug("Sending Msg:" + m);
producer.send(m);
}
}
@@ -206,8 +234,11 @@ public class PropertyValueTest extends TestCase implements MessageListener
Assert.assertEquals("Check String properties are correctly transported",
"Test", m.getStringProperty("String"));
}
+ received.clear();
assertEqual(messages.iterator(), actual.iterator());
+
+ messages.clear();
}
private static void assertEqual(Iterator expected, Iterator actual)
@@ -269,11 +300,11 @@ public class PropertyValueTest extends TestCase implements MessageListener
{
test._count = Integer.parseInt(argv[1]);
}
- test.test();
+ test.testOnce();
}
public static junit.framework.Test suite()
{
- return new VMBrokerSetup(new junit.framework.TestSuite(PropertyValueTest.class));
+ return new junit.framework.TestSuite(PropertyValueTest.class);
}
}
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java
index a56bae3d70..7762cb3fe9 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java
@@ -42,6 +42,7 @@ import org.apache.qpid.client.message.AbstractJMSMessage;
import org.apache.qpid.client.transport.TransportConnection;
import org.apache.qpid.url.URLSyntaxException;
import org.apache.qpid.AMQException;
+import org.apache.qpid.testutil.QpidClientConnection;
import org.apache.log4j.Logger;
import org.apache.log4j.Level;
@@ -62,14 +63,14 @@ public class MessageRequeueTest extends TestCase
private boolean testReception = true;
private long[] receieved = new long[numTestMessages + 1];
- private boolean passed=false;
+ private boolean passed = false;
protected void setUp() throws Exception
{
super.setUp();
TransportConnection.createVMBroker(1);
- QpidClientConnection conn = new QpidClientConnection();
+ QpidClientConnection conn = new QpidClientConnection(BROKER);
conn.connect();
// clear queue
@@ -85,21 +86,28 @@ public class MessageRequeueTest extends TestCase
{
super.tearDown();
- if (!passed)
+ if (!passed) // clean up
{
- QpidClientConnection conn = new QpidClientConnection();
+ QpidClientConnection conn = new QpidClientConnection(BROKER);
conn.connect();
// clear queue
conn.consume(queue, consumeTimeout);
+
+ conn.disconnect();
}
TransportConnection.killVMBroker(1);
}
- /** multiple consumers */
+ /**
+ * multiple consumers
+ *
+ * @throws javax.jms.JMSException if a JMS problem occurs
+ * @throws InterruptedException on timeout
+ */
public void testDrain() throws JMSException, InterruptedException
{
- QpidClientConnection conn = new QpidClientConnection();
+ QpidClientConnection conn = new QpidClientConnection(BROKER);
conn.connect();
@@ -170,6 +178,7 @@ public class MessageRequeueTest extends TestCase
assertEquals(list.toString(), 0, failed);
_logger.info("consumed: " + messagesReceived);
conn.disconnect();
+ passed = true;
}
/** multiple consumers */
@@ -186,8 +195,8 @@ public class MessageRequeueTest extends TestCase
Thread t4 = new Thread(c4);
t1.start();
-// t2.start();
-// t3.start();
+ t2.start();
+ t3.start();
// t4.start();
try
@@ -230,7 +239,7 @@ public class MessageRequeueTest extends TestCase
}
assertEquals(list.toString() + "-" + numTestMessages + "-" + totalConsumed, 0, failed);
assertEquals("number of consumed messages does not match initial data", numTestMessages, totalConsumed);
- passed=true;
+ passed = true;
}
class Consumer implements Runnable
@@ -248,7 +257,7 @@ public class MessageRequeueTest extends TestCase
try
{
_logger.info("consumer-" + id + ": starting");
- QpidClientConnection conn = new QpidClientConnection();
+ QpidClientConnection conn = new QpidClientConnection(BROKER);
conn.connect();
@@ -318,286 +327,51 @@ public class MessageRequeueTest extends TestCase
}
- public class QpidClientConnection implements ExceptionListener
+ public void testRequeue() throws JMSException, AMQException, URLSyntaxException
{
- private boolean transacted = true;
- private int ackMode = Session.CLIENT_ACKNOWLEDGE;
- private Connection connection;
-
- private String virtualHost;
- private String brokerlist;
- private int prefetch;
- protected Session session;
- protected boolean connected;
-
- public QpidClientConnection()
- {
- super();
- setVirtualHost("/test");
- setBrokerList(BROKER);
- setPrefetch(5000);
- }
-
-
- public void connect() throws JMSException
- {
- if (!connected)
- {
- /*
- * amqp://[user:pass@][clientid]/virtualhost?
- * brokerlist='[transport://]host[:port][?option='value'[&option='value']];'
- * [&failover='method[?option='value'[&option='value']]']
- * [&option='value']"
- */
- String brokerUrl = "amqp://guest:guest@" + virtualHost + "?brokerlist='" + brokerlist + "'";
- try
- {
- AMQConnectionFactory factory = new AMQConnectionFactory(new AMQConnectionURL(brokerUrl));
- _logger.info("connecting to Qpid :" + brokerUrl);
- connection = factory.createConnection();
-
- // register exception listener
- connection.setExceptionListener(this);
-
- session = ((AMQConnection) connection).createSession(transacted, ackMode, prefetch);
-
-
- _logger.info("starting connection");
- connection.start();
-
- connected = true;
- }
- catch (URLSyntaxException e)
- {
- throw new JMSException("URL syntax error in [" + brokerUrl + "]: " + e.getMessage());
- }
- }
- }
-
- public void disconnect() throws JMSException
- {
- if (connected)
- {
- session.commit();
- session.close();
- connection.close();
- connected = false;
- _logger.info("disconnected");
- }
- }
-
- public void disconnectWithoutCommit() throws JMSException
- {
- if (connected)
- {
- session.close();
- connection.close();
- connected = false;
- _logger.info("disconnected without commit");
- }
- }
-
- public String getBrokerList()
- {
- return brokerlist;
- }
-
- public void setBrokerList(String brokerlist)
- {
- this.brokerlist = brokerlist;
- }
-
- public String getVirtualHost()
+ int run = 0;
+ while (run < 10)
{
- return virtualHost;
- }
-
- public void setVirtualHost(String virtualHost)
- {
- this.virtualHost = virtualHost;
- }
-
- public void setPrefetch(int prefetch)
- {
- this.prefetch = prefetch;
- }
+ run++;
-
- /** override as necessary */
- public void onException(JMSException exception)
- {
- _logger.info("ExceptionListener event: error " + exception.getErrorCode() + ", message: " + exception.getMessage());
- }
-
- public boolean isConnected()
- {
- return connected;
- }
-
- public Session getSession()
- {
- return session;
- }
-
- /**
- * Put a String as a text messages, repeat n times. A null payload will result in a null message.
- *
- * @param queueName The queue name to put to
- * @param payload the content of the payload
- * @param copies the number of messages to put
- *
- * @throws javax.jms.JMSException any exception that occurs
- */
- public void put(String queueName, String payload, int copies) throws JMSException
- {
- if (!connected)
- {
- connect();
- }
-
- _logger.info("putting to queue " + queueName);
- Queue queue = session.createQueue(queueName);
-
- final MessageProducer sender = session.createProducer(queue);
-
- for (int i = 0; i < copies; i++)
- {
- Message m = session.createTextMessage(payload + i);
- m.setIntProperty("index", i + 1);
- sender.send(m);
- }
-
- session.commit();
- sender.close();
- _logger.info("put " + copies + " copies");
- }
-
- /**
- * GET the top message on a queue. Consumes the message. Accepts timeout value.
- *
- * @param queueName The quename to get from
- * @param readTimeout The timeout to use
- *
- * @return the content of the text message if any
- *
- * @throws javax.jms.JMSException any exception that occured
- */
- public Message getNextMessage(String queueName, long readTimeout) throws JMSException
- {
- if (!connected)
+ if (_logger.isInfoEnabled())
{
- connect();
+ _logger.info("testRequeue run " + run);
}
- Queue queue = session.createQueue(queueName);
+ String virtualHost = "/test";
+ String brokerlist = BROKER;
+ String brokerUrl = "amqp://guest:guest@" + virtualHost + "?brokerlist='" + brokerlist + "'";
- final MessageConsumer consumer = session.createConsumer(queue);
+ Connection conn = new AMQConnection(brokerUrl);
+ Session session = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ Queue q = session.createQueue(queue);
- Message message = consumer.receive(readTimeout);
- session.commit();
- consumer.close();
-
- Message result;
+ _logger.debug("Create Consumer");
+ MessageConsumer consumer = session.createConsumer(q);
- // all messages we consume should be TextMessages
- if (message instanceof TextMessage)
- {
- result = ((TextMessage) message);
- }
- else if (null == message)
+ try
{
- result = null;
+ Thread.sleep(2000);
}
- else
+ catch (InterruptedException e)
{
- _logger.info("warning: received non-text message");
- result = message;
+ //
}
- return result;
- }
+ _logger.debug("Receiving msg");
+ Message msg = consumer.receive(1000);
- /**
- * GET the top message on a queue. Consumes the message.
- *
- * @param queueName The Queuename to get from
- *
- * @return The string content of the text message, if any received
- *
- * @throws javax.jms.JMSException any exception that occurs
- */
- public Message getNextMessage(String queueName) throws JMSException
- {
- return getNextMessage(queueName, 0);
- }
+ assertNotNull("Message should not be null", msg);
- /**
- * Completely clears a queue. For readTimeout behaviour see Javadocs for javax.jms.MessageConsumer.
- *
- * @param queueName The Queue name to consume from
- * @param readTimeout The timeout for each consume
- *
- * @throws javax.jms.JMSException Any exception that occurs during the consume
- * @throws InterruptedException If the consume thread was interrupted during a consume.
- */
- public void consume(String queueName, int readTimeout) throws JMSException, InterruptedException
- {
- if (!connected)
- {
- connect();
- }
-
- _logger.info("consuming queue " + queueName);
- Queue queue = session.createQueue(queueName);
-
- final MessageConsumer consumer = session.createConsumer(queue);
- int messagesReceived = 0;
-
- _logger.info("consuming...");
- while ((consumer.receive(readTimeout)) != null)
- {
- messagesReceived++;
- }
- session.commit();
+ // As we have not ack'd message will be requeued.
+ _logger.debug("Close Consumer");
consumer.close();
- _logger.info("consumed: " + messagesReceived);
- }
- }
-
-
- public void testRequeue() throws JMSException, AMQException, URLSyntaxException
- {
- String virtualHost = "/test";
- String brokerlist = "vm://:1";
- String brokerUrl = "amqp://guest:guest@" + virtualHost + "?brokerlist='" + brokerlist + "'";
- Connection conn = new AMQConnection(brokerUrl);
- Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Queue q = session.createQueue(queue);
-
- _logger.info("Create Consumer");
- MessageConsumer consumer = session.createConsumer(q);
-
- try
- {
- Thread.sleep(2000);
- }
- catch (InterruptedException e)
- {
- //
+ _logger.debug("Close Connection");
+ conn.close();
}
-
- _logger.info("Receiving msg");
- Message msg = consumer.receive();
-
- assertNotNull("Message should not be null", msg);
-
- _logger.info("Close Consumer");
- consumer.close();
-
- _logger.info("Close Connection");
- conn.close();
}
} \ No newline at end of file
diff --git a/java/client/src/test/java/org/apache/qpid/testutil/QpidClientConnection.java b/java/client/src/test/java/org/apache/qpid/testutil/QpidClientConnection.java
new file mode 100644
index 0000000000..f2afa472ab
--- /dev/null
+++ b/java/client/src/test/java/org/apache/qpid/testutil/QpidClientConnection.java
@@ -0,0 +1,268 @@
+package org.apache.qpid.testutil;
+
+import org.apache.qpid.client.AMQConnectionFactory;
+import org.apache.qpid.client.AMQConnectionURL;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.url.URLSyntaxException;
+import org.apache.log4j.Logger;
+
+import javax.jms.ExceptionListener;
+import javax.jms.Session;
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Queue;
+import javax.jms.MessageProducer;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.TextMessage;
+
+public class QpidClientConnection implements ExceptionListener
+{
+
+ private static final Logger _logger = Logger.getLogger(QpidClientConnection.class);
+
+ private boolean transacted = true;
+ private int ackMode = Session.CLIENT_ACKNOWLEDGE;
+ private Connection connection;
+
+ private String virtualHost;
+ private String brokerlist;
+ private int prefetch;
+ protected Session session;
+ protected boolean connected;
+
+ public QpidClientConnection(String broker)
+ {
+ super();
+ setVirtualHost("/test");
+ setBrokerList(broker);
+ setPrefetch(5000);
+ }
+
+
+ public void connect() throws JMSException
+ {
+ if (!connected)
+ {
+ /*
+ * amqp://[user:pass@][clientid]/virtualhost?
+ * brokerlist='[transport://]host[:port][?option='value'[&option='value']];'
+ * [&failover='method[?option='value'[&option='value']]']
+ * [&option='value']"
+ */
+ String brokerUrl = "amqp://guest:guest@" + virtualHost + "?brokerlist='" + brokerlist + "'";
+ try
+ {
+ AMQConnectionFactory factory = new AMQConnectionFactory(new AMQConnectionURL(brokerUrl));
+ _logger.info("connecting to Qpid :" + brokerUrl);
+ connection = factory.createConnection();
+
+ // register exception listener
+ connection.setExceptionListener(this);
+
+ session = ((AMQConnection) connection).createSession(transacted, ackMode, prefetch);
+
+
+ _logger.info("starting connection");
+ connection.start();
+
+ connected = true;
+ }
+ catch (URLSyntaxException e)
+ {
+ throw new JMSException("URL syntax error in [" + brokerUrl + "]: " + e.getMessage());
+ }
+ }
+ }
+
+ public void disconnect() throws JMSException
+ {
+ if (connected)
+ {
+ session.commit();
+ session.close();
+ connection.close();
+ connected = false;
+ _logger.info("disconnected");
+ }
+ }
+
+ public void disconnectWithoutCommit() throws JMSException
+ {
+ if (connected)
+ {
+ session.close();
+ connection.close();
+ connected = false;
+ _logger.info("disconnected without commit");
+ }
+ }
+
+ public String getBrokerList()
+ {
+ return brokerlist;
+ }
+
+ public void setBrokerList(String brokerlist)
+ {
+ this.brokerlist = brokerlist;
+ }
+
+ public String getVirtualHost()
+ {
+ return virtualHost;
+ }
+
+ public void setVirtualHost(String virtualHost)
+ {
+ this.virtualHost = virtualHost;
+ }
+
+ public void setPrefetch(int prefetch)
+ {
+ this.prefetch = prefetch;
+ }
+
+
+ /** override as necessary */
+ public void onException(JMSException exception)
+ {
+ _logger.info("ExceptionListener event: error " + exception.getErrorCode() + ", message: " + exception.getMessage());
+ }
+
+ public boolean isConnected()
+ {
+ return connected;
+ }
+
+ public Session getSession()
+ {
+ return session;
+ }
+
+ /**
+ * Put a String as a text messages, repeat n times. A null payload will result in a null message.
+ *
+ * @param queueName The queue name to put to
+ * @param payload the content of the payload
+ * @param copies the number of messages to put
+ *
+ * @throws javax.jms.JMSException any exception that occurs
+ */
+ public void put(String queueName, String payload, int copies) throws JMSException
+ {
+ if (!connected)
+ {
+ connect();
+ }
+
+ _logger.info("putting to queue " + queueName);
+ Queue queue = session.createQueue(queueName);
+
+ final MessageProducer sender = session.createProducer(queue);
+
+ for (int i = 0; i < copies; i++)
+ {
+ Message m = session.createTextMessage(payload + i);
+ m.setIntProperty("index", i + 1);
+ sender.send(m);
+ }
+
+ session.commit();
+ sender.close();
+ _logger.info("put " + copies + " copies");
+ }
+
+ /**
+ * GET the top message on a queue. Consumes the message. Accepts timeout value.
+ *
+ * @param queueName The quename to get from
+ * @param readTimeout The timeout to use
+ *
+ * @return the content of the text message if any
+ *
+ * @throws javax.jms.JMSException any exception that occured
+ */
+ public Message getNextMessage(String queueName, long readTimeout) throws JMSException
+ {
+ if (!connected)
+ {
+ connect();
+ }
+
+ Queue queue = session.createQueue(queueName);
+
+ final MessageConsumer consumer = session.createConsumer(queue);
+
+ Message message = consumer.receive(readTimeout);
+ session.commit();
+ consumer.close();
+
+ Message result;
+
+ // all messages we consume should be TextMessages
+ if (message instanceof TextMessage)
+ {
+ result = ((TextMessage) message);
+ }
+ else if (null == message)
+ {
+ result = null;
+ }
+ else
+ {
+ _logger.info("warning: received non-text message");
+ result = message;
+ }
+
+ return result;
+ }
+
+ /**
+ * GET the top message on a queue. Consumes the message.
+ *
+ * @param queueName The Queuename to get from
+ *
+ * @return The string content of the text message, if any received
+ *
+ * @throws javax.jms.JMSException any exception that occurs
+ */
+ public Message getNextMessage(String queueName) throws JMSException
+ {
+ return getNextMessage(queueName, 0);
+ }
+
+ /**
+ * Completely clears a queue. For readTimeout behaviour see Javadocs for javax.jms.MessageConsumer.
+ *
+ * @param queueName The Queue name to consume from
+ * @param readTimeout The timeout for each consume
+ *
+ * @throws javax.jms.JMSException Any exception that occurs during the consume
+ * @throws InterruptedException If the consume thread was interrupted during a consume.
+ */
+ public void consume(String queueName, int readTimeout) throws JMSException, InterruptedException
+ {
+ if (!connected)
+ {
+ connect();
+ }
+
+ _logger.info("consuming queue " + queueName);
+ Queue queue = session.createQueue(queueName);
+
+ final MessageConsumer consumer = session.createConsumer(queue);
+ int messagesReceived = 0;
+
+ _logger.info("consuming...");
+ while ((consumer.receive(readTimeout)) != null)
+ {
+ messagesReceived++;
+ }
+
+ session.commit();
+ consumer.close();
+ _logger.info("consumed: " + messagesReceived);
+ }
+}
+
diff --git a/java/common/src/main/java/org/apache/qpid/util/ConcurrentLinkedMessageQueueAtomicSize.java b/java/common/src/main/java/org/apache/qpid/util/ConcurrentLinkedMessageQueueAtomicSize.java
index 883d5018cd..4636f44795 100644
--- a/java/common/src/main/java/org/apache/qpid/util/ConcurrentLinkedMessageQueueAtomicSize.java
+++ b/java/common/src/main/java/org/apache/qpid/util/ConcurrentLinkedMessageQueueAtomicSize.java
@@ -181,8 +181,37 @@ public class ConcurrentLinkedMessageQueueAtomicSize<E> extends ConcurrentLinkedQ
@Override
public Iterator<E> iterator()
{
- throw new RuntimeException("Not Implemented");
+ final Iterator<E> mainMessageIterator = super.iterator();
+ return new Iterator<E>()
+ {
+ final Iterator<E> _headIterator = _messageHead.iterator();
+ final Iterator<E> _mainIterator = mainMessageIterator;
+
+ Iterator<E> last;
+
+ public boolean hasNext()
+ {
+ return _headIterator.hasNext() || _mainIterator.hasNext();
+ }
+ public E next()
+ {
+ if (_headIterator.hasNext())
+ {
+ last = _headIterator;
+ return _headIterator.next();
+ }
+ else
+ {
+ last = _mainIterator;
+ return _mainIterator.next();
+ }
+ }
+ public void remove()
+ {
+ last.remove();
+ }
+ };
}
@Override
diff --git a/java/systests/src/main/java/org/apache/qpid/test/VMTestCase.java b/java/systests/src/main/java/org/apache/qpid/test/VMTestCase.java
new file mode 100644
index 0000000000..bbac06382d
--- /dev/null
+++ b/java/systests/src/main/java/org/apache/qpid/test/VMTestCase.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.test;
+
+import junit.extensions.TestSetup;
+import junit.framework.Test;
+import junit.framework.TestCase;
+
+import org.apache.qpid.client.transport.TransportConnection;
+import org.apache.qpid.jndi.PropertiesFileInitialContextFactory;
+
+import javax.naming.Context;
+import javax.naming.spi.InitialContextFactory;
+import javax.jms.Queue;
+import javax.jms.ConnectionFactory;
+import javax.jms.Session;
+import javax.jms.Connection;
+import javax.jms.MessageProducer;
+import java.util.Hashtable;
+import java.util.List;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.HashMap;
+
+public class VMTestCase extends TestCase
+{
+ protected long RECEIVE_TIMEOUT = 1000l; // 1 sec
+ protected long CLOSE_TIMEOUT = 10000l; // 10 secs
+
+ protected Context _context;
+ protected String _clientID;
+ protected String _virtualhost;
+ protected String _brokerlist;
+
+ protected final Map<String, String> _connections = new HashMap<String, String>();
+ protected final Map<String, String> _queues = new HashMap<String, String>();
+ protected final Map<String, String> _topics = new HashMap<String, String>();
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ try
+ {
+ TransportConnection.createVMBroker(1);
+ }
+ catch (Exception e)
+ {
+ fail("Unable to create broker: " + e);
+ }
+
+ InitialContextFactory factory = new PropertiesFileInitialContextFactory();
+
+ Hashtable<String, String> env = new Hashtable<String, String>();
+
+ if (_clientID == null)
+ {
+ _clientID = this.getClass().getName();
+ }
+
+ if (_virtualhost == null)
+ {
+ _virtualhost = "/test";
+ }
+
+ if (_brokerlist == null)
+ {
+ _brokerlist = "vm://:1";
+ }
+
+ env.put("connectionfactory.connection", "amqp://client:client@" +
+ _clientID + _virtualhost + "?brokerlist='" + _brokerlist + "'");
+
+ for (Map.Entry<String, String> c : _connections.entrySet())
+ {
+ env.put("connectionfactory." + c.getKey(), c.getValue());
+ }
+
+ env.put("queue.queue", "queue");
+
+ for (Map.Entry<String, String> q : _queues.entrySet())
+ {
+ env.put("queue." + q.getKey(), q.getValue());
+ }
+
+ env.put("topic.topic", "topic");
+
+ for (Map.Entry<String, String> t : _topics.entrySet())
+ {
+ env.put("topic." + t.getKey(), t.getValue());
+ }
+
+ _context = factory.getInitialContext(env);
+ }
+
+ protected void tearDown() throws Exception
+ {
+ TransportConnection.killVMBroker(1);
+ super.tearDown();
+ }
+}
diff --git a/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserTest.java b/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserTest.java
new file mode 100644
index 0000000000..ac65eec979
--- /dev/null
+++ b/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserTest.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.test.client;
+
+import org.apache.qpid.test.VMTestCase;
+import org.apache.log4j.Logger;
+
+import javax.jms.Queue;
+import javax.jms.ConnectionFactory;
+import javax.jms.Session;
+import javax.jms.Connection;
+import javax.jms.MessageProducer;
+import javax.jms.MessageConsumer;
+import javax.jms.QueueBrowser;
+import javax.jms.TextMessage;
+import javax.jms.JMSException;
+import javax.jms.QueueReceiver;
+import javax.jms.Message;
+import java.util.Enumeration;
+
+public class QueueBrowserTest extends VMTestCase
+{
+ private static final Logger _logger = Logger.getLogger(QueueBrowserTest.class);
+
+ private static final int MSG_COUNT = 10;
+
+ private Connection _clientConnection;
+ private Session _clientSession;
+ private Queue _queue;
+
+ public void setUp() throws Exception
+ {
+
+ super.setUp();
+
+ _queue = (Queue) _context.lookup("queue");
+
+ //Create Client
+ _clientConnection = ((ConnectionFactory) _context.lookup("connection")).createConnection();
+
+ _clientConnection.start();
+
+ _clientSession = _clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ //Ensure _queue is created
+ _clientSession.createConsumer(_queue).close();
+
+ //Create Producer put some messages on the queue
+ Connection producerConnection = ((ConnectionFactory) _context.lookup("connection")).createConnection();
+
+ producerConnection.start();
+
+ Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageProducer producer = producerSession.createProducer(_queue);
+
+ for (int msg = 0; msg < MSG_COUNT; msg++)
+ {
+ producer.send(producerSession.createTextMessage("Message " + msg));
+ }
+
+ producerConnection.close();
+
+ }
+
+ /*
+ * Test Messages Remain on Queue
+ * Create a queu and send messages to it. Browse them and then receive them all to verify they were still there
+ *
+ */
+
+ public void queueBrowserMsgsRemainOnQueueTest() throws JMSException
+ {
+
+ // create QueueBrowser
+ _logger.info("Creating Queue Browser");
+
+ QueueBrowser queueBrowser = _clientSession.createBrowser(_queue);
+
+ // check for messages
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Checking for " + MSG_COUNT + " messages with QueueBrowser");
+ }
+
+ int msgCount = 0;
+ Enumeration msgs = queueBrowser.getEnumeration();
+
+ while (msgs.hasMoreElements())
+ {
+ msgs.nextElement();
+ msgCount++;
+ }
+
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Found " + msgCount + " messages total in browser");
+ }
+
+ // check to see if all messages found
+// assertEquals("browser did not find all messages", MSG_COUNT, msgCount);
+ if (msgCount != MSG_COUNT)
+ {
+ _logger.warn(msgCount + "/" + MSG_COUNT + " messages received.");
+ }
+
+ //Close browser
+ queueBrowser.close();
+
+ // VERIFY
+
+ // continue and try to receive all messages
+ MessageConsumer consumer = _clientSession.createConsumer(_queue);
+
+ _logger.info("Verify messages are still on the queue");
+
+ Message tempMsg;
+
+ for (msgCount = 0; msgCount < MSG_COUNT; msgCount++)
+ {
+ tempMsg = (TextMessage) consumer.receive(RECEIVE_TIMEOUT);
+ if (tempMsg == null)
+ {
+ fail("Message " + msgCount + " not retrieved from queue");
+ }
+ }
+
+ _logger.info("All messages recevied from queue");
+ }
+
+
+}