From de248153d311b1e0211dfe3230afcb306f3c0192 Mon Sep 17 00:00:00 2001 From: Martin Ritchie Date: Fri, 23 Feb 2007 10:20:44 +0000 Subject: QPID-346 Message loss after rollback QPID-348 Problems of prefetching messages QPID-355 Closing a consumer does not ensure messages delivery will stop for that subscription BROKER AMQChannel - updated requeue to either resend via the Delivery Manager not directly via msg.writedeliver. BasicRejectMethodHandler - initial place holder. TxRollbackHandler - Added comment AMQMessage - added ability to record who has taken the message so that it can be resent to that subscriber on resend/requeue. AMQQueue - added the queue reference to the Subscription creation ConcurrentSelectorDeliveryManager - Added methods to correctly monitor the size of queue messages. Including messages on the resend queue of a Subscriber. Additional locking to ensure that messages are not sent to the subscriber after Closure. QPID-355 DeliveryManager - adjusted deliver call to allow delivery to the head of the queue. Subscription - changes to allow selction of queue(resend or predelivery) methods to add to resend and getSendLock to ensure that sending to the Subscription is allowed. SubscriptionFactory - changes to allow the AMQQueue to be passed to the Subscription. SubscriptionImpl - implementation of the interfaces. Local storage of messages to be resent and requeuing of the messages during closure. SubscriptionSet - changes to retrieve the actual stored Subscription when performing removeSubscriber. So we have access to the the resend queue. AMQStateManager - Added BasicRejectMethodHandler TransactionalContext - Added option to deliver the messages to the front of the queue. LocalTransactionalContext - cleared the _postComitDeliveryList on rollback. Added option to deliver the messages to the front of the queue. NonTransactionalContext - Added option to deliver the messages to the front of the queue. DeliverMessageOperation.java DELELTED AS NOT USED. CLIENT AMQSession - added ability to get the pervious state of the dispatcher when settting Stopped, fixed the channel suspension problems on broker so uncommented clean up code in rollback and recover. BasicMessageConsumer - updated the rollback so that it sends reject messages to server. AbstractJMSMessage - whitespace + added extra message properties to the toString() AMQProtocolHandler - whitespace + extra debug output TransactedTest - updated expect to prevent NPEs also added extra logging to help understand what is going on. CLUSTER ClusteredQueue - AMQQueue changes for message deliveryFirst. RemoteSubscriptionImpl - Implementation of Subscription SYSTESTS AbstractHeadersExchangeTestBase - AMQQueue changes for message deliveryFirst. AMQQueueMBeanTest - changes for message deliveryFirst. ConcurrencyTest - changes for message deliveryFirst. DeliveryManagerTest - changes for message deliveryFirst. SubscriptionTestHelper - Implementation of Subscription WhiteSpace only UnacknowledgedMessageMapImpl.java git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@510897 13f79535-47bb-0310-9956-ffa450edef68 --- .../exchange/AbstractHeadersExchangeTestBase.java | 3 +- .../qpid/server/queue/AMQQueueMBeanTest.java | 5 ++-- .../apache/qpid/server/queue/ConcurrencyTest.java | 2 +- .../qpid/server/queue/DeliveryManagerTest.java | 12 ++++---- .../qpid/server/queue/SubscriptionTestHelper.java | 33 +++++++++++++++++++--- 5 files changed, 40 insertions(+), 15 deletions(-) (limited to 'java/systests/src') diff --git a/java/systests/src/main/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java b/java/systests/src/main/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java index 6beeb92053..ccd23bc0bc 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java +++ b/java/systests/src/main/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java @@ -272,9 +272,10 @@ public class AbstractHeadersExchangeTestBase extends TestCase * not invoked. It is unnecessary since for this test we only care to know whether the message was * sent to the queue; the queue processing logic is not being tested. * @param msg + * @param deliverFirst * @throws AMQException */ - public void process(StoreContext context, AMQMessage msg) throws AMQException + public void process(StoreContext context, AMQMessage msg, boolean deliverFirst) throws AMQException { messages.add(new HeadersExchangeTest.Message(msg)); } diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java b/java/systests/src/main/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java index 2d0315d7f5..26332579cb 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java +++ b/java/systests/src/main/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java @@ -19,7 +19,6 @@ package org.apache.qpid.server.queue; import junit.framework.TestCase; import org.apache.qpid.AMQException; -import org.apache.qpid.framing.BasicPublishBody; import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.abstraction.MessagePublishInfo; @@ -151,7 +150,7 @@ public class AMQQueueMBeanTest extends TestCase AMQMessage msg = message(false); long id = msg.getMessageId(); _queue.clearQueue(_storeContext); - _queue.process(_storeContext, msg); + _queue.process(_storeContext, msg, false); msg.routingComplete(_messageStore, _storeContext, new MessageHandleFactory()); _queueMBean.viewMessageContent(id); try @@ -216,7 +215,7 @@ public class AMQQueueMBeanTest extends TestCase } for (int i = 0; i < messageCount; i++) { - _queue.process(_storeContext, messages[i]); + _queue.process(_storeContext, messages[i], false); } for (int i = 0; i < messages.length; i++) diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/ConcurrencyTest.java b/java/systests/src/main/java/org/apache/qpid/server/queue/ConcurrencyTest.java index 6f3d42d090..4971db2d28 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/queue/ConcurrencyTest.java +++ b/java/systests/src/main/java/org/apache/qpid/server/queue/ConcurrencyTest.java @@ -194,7 +194,7 @@ public class ConcurrencyTest extends MessageTestHelper AMQMessage msg = nextMessage(); if (msg != null) { - _deliveryMgr.deliver(null, new AMQShortString(toString()), msg); + _deliveryMgr.deliver(null, new AMQShortString(toString()), msg, false); } } } diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/DeliveryManagerTest.java b/java/systests/src/main/java/org/apache/qpid/server/queue/DeliveryManagerTest.java index e1be640c8e..dc5a6d3cf6 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/queue/DeliveryManagerTest.java +++ b/java/systests/src/main/java/org/apache/qpid/server/queue/DeliveryManagerTest.java @@ -49,7 +49,7 @@ abstract public class DeliveryManagerTest extends MessageTestHelper for (int i = 0; i < batch; i++) { - _mgr.deliver(_storeContext, DEFAULT_QUEUE_NAME, messages[i]); + _mgr.deliver(_storeContext, DEFAULT_QUEUE_NAME, messages[i], false); } SubscriptionTestHelper s1 = new SubscriptionTestHelper("1"); @@ -59,7 +59,7 @@ abstract public class DeliveryManagerTest extends MessageTestHelper for (int i = batch; i < messages.length; i++) { - _mgr.deliver(_storeContext, DEFAULT_QUEUE_NAME, messages[i]); + _mgr.deliver(_storeContext, DEFAULT_QUEUE_NAME, messages[i], false); } assertTrue(s1.getMessages().isEmpty()); @@ -97,7 +97,7 @@ abstract public class DeliveryManagerTest extends MessageTestHelper for (int i = 0; i < batch; i++) { - _mgr.deliver(_storeContext, DEFAULT_QUEUE_NAME, messages[i]); + _mgr.deliver(_storeContext, DEFAULT_QUEUE_NAME, messages[i], false); } assertEquals(batch, s1.getMessages().size()); @@ -111,7 +111,7 @@ abstract public class DeliveryManagerTest extends MessageTestHelper s1.setSuspended(true); for (int i = batch; i < messages.length; i++) { - _mgr.deliver(_storeContext, DEFAULT_QUEUE_NAME, messages[i]); + _mgr.deliver(_storeContext, DEFAULT_QUEUE_NAME, messages[i], false); } _mgr.processAsync(new OnCurrentThreadExecutor()); @@ -133,7 +133,7 @@ abstract public class DeliveryManagerTest extends MessageTestHelper try { AMQMessage msg = message(true); - _mgr.deliver(_storeContext, DEFAULT_QUEUE_NAME, msg); + _mgr.deliver(_storeContext, DEFAULT_QUEUE_NAME, msg, false); msg.checkDeliveredToConsumer(); fail("expected exception did not occur"); } @@ -155,7 +155,7 @@ abstract public class DeliveryManagerTest extends MessageTestHelper _subscriptions.addSubscriber(s); s.setSuspended(true); AMQMessage msg = message(true); - _mgr.deliver(_storeContext, DEFAULT_QUEUE_NAME, msg); + _mgr.deliver(_storeContext, DEFAULT_QUEUE_NAME, msg, false); msg.checkDeliveredToConsumer(); fail("expected exception did not occur"); } diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java b/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java index b3574ecba4..01eb2ba6a2 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java +++ b/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java @@ -67,12 +67,22 @@ public class SubscriptionTestHelper implements Subscription return isSuspended; } - public boolean wouldSuspend(AMQMessage msg) + public boolean wouldSuspend(AMQMessage msg) { return isSuspended; } - + public void addToResendQueue(AMQMessage msg) + { + //no-op + } + + public Object getSendLock() + { + return new Object(); + } + + public void queueDeleted(AMQQueue queue) { } @@ -92,7 +102,17 @@ public class SubscriptionTestHelper implements Subscription return null; } - public void enqueueForPreDelivery(AMQMessage msg) + public Queue getResendQueue() + { + return null; + } + + public Queue getNextQueue(Queue messages) + { + return messages; + } + + public void enqueueForPreDelivery(AMQMessage msg, boolean deliverFirst) { //no-op } @@ -107,9 +127,14 @@ public class SubscriptionTestHelper implements Subscription //no-op } + public boolean isClosed() + { + return false; + } + public boolean isBrowser() { - return false; //To change body of implemented methods use File | Settings | File Templates. + return false; } public int hashCode() -- cgit v1.2.1