diff options
| author | Martin Ritchie <ritchiem@apache.org> | 2007-02-23 10:20:44 +0000 |
|---|---|---|
| committer | Martin Ritchie <ritchiem@apache.org> | 2007-02-23 10:20:44 +0000 |
| commit | de248153d311b1e0211dfe3230afcb306f3c0192 (patch) | |
| tree | 30412df8d5fd1d3ef076fba0903301b25f8a7518 /java/systests/src | |
| parent | f74e4dc27d1655760d0213fd60cc75c272c26f00 (diff) | |
| download | qpid-python-de248153d311b1e0211dfe3230afcb306f3c0192.tar.gz | |
QPID-346 Message loss after rollback
QPID-348 Problems of prefetching messages
QPID-355 Closing a consumer does not ensure messages delivery will stop for that subscription
BROKER
AMQChannel - updated requeue to either resend via the Delivery Manager not directly via msg.writedeliver.
BasicRejectMethodHandler - initial place holder.
TxRollbackHandler - Added comment
AMQMessage - added ability to record who has taken the message so that it can be resent to that subscriber on resend/requeue.
AMQQueue - added the queue reference to the Subscription creation
ConcurrentSelectorDeliveryManager - Added methods to correctly monitor the size of queue messages. Including messages on the resend queue of a Subscriber. Additional locking to ensure that messages are not sent to the subscriber after Closure. QPID-355
DeliveryManager - adjusted deliver call to allow delivery to the head of the queue.
Subscription - changes to allow selction of queue(resend or predelivery) methods to add to resend and getSendLock to ensure that sending to the Subscription is allowed.
SubscriptionFactory - changes to allow the AMQQueue to be passed to the Subscription.
SubscriptionImpl - implementation of the interfaces. Local storage of messages to be resent and requeuing of the messages during closure.
SubscriptionSet - changes to retrieve the actual stored Subscription when performing removeSubscriber. So we have access to the the resend queue.
AMQStateManager - Added BasicRejectMethodHandler
TransactionalContext - Added option to deliver the messages to the front of the queue.
LocalTransactionalContext - cleared the _postComitDeliveryList on rollback. Added option to deliver the messages to the front of the queue.
NonTransactionalContext - Added option to deliver the messages to the front of the queue.
DeliverMessageOperation.java DELELTED AS NOT USED.
CLIENT
AMQSession - added ability to get the pervious state of the dispatcher when settting Stopped, fixed the channel suspension problems on broker so uncommented clean up code in rollback and recover.
BasicMessageConsumer - updated the rollback so that it sends reject messages to server.
AbstractJMSMessage - whitespace + added extra message properties to the toString()
AMQProtocolHandler - whitespace + extra debug output
TransactedTest - updated expect to prevent NPEs also added extra logging to help understand what is going on.
CLUSTER
ClusteredQueue - AMQQueue changes for message deliveryFirst.
RemoteSubscriptionImpl - Implementation of Subscription
SYSTESTS
AbstractHeadersExchangeTestBase - AMQQueue changes for message deliveryFirst.
AMQQueueMBeanTest - changes for message deliveryFirst.
ConcurrencyTest - changes for message deliveryFirst.
DeliveryManagerTest - changes for message deliveryFirst.
SubscriptionTestHelper - Implementation of Subscription
WhiteSpace only
UnacknowledgedMessageMapImpl.java
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@510897 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/systests/src')
5 files changed, 40 insertions, 15 deletions
diff --git a/java/systests/src/main/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java b/java/systests/src/main/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java index 6beeb92053..ccd23bc0bc 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java +++ b/java/systests/src/main/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java @@ -272,9 +272,10 @@ public class AbstractHeadersExchangeTestBase extends TestCase * not invoked. It is unnecessary since for this test we only care to know whether the message was * sent to the queue; the queue processing logic is not being tested. * @param msg + * @param deliverFirst * @throws AMQException */ - public void process(StoreContext context, AMQMessage msg) throws AMQException + public void process(StoreContext context, AMQMessage msg, boolean deliverFirst) throws AMQException { messages.add(new HeadersExchangeTest.Message(msg)); } diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java b/java/systests/src/main/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java index 2d0315d7f5..26332579cb 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java +++ b/java/systests/src/main/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java @@ -19,7 +19,6 @@ package org.apache.qpid.server.queue; import junit.framework.TestCase; import org.apache.qpid.AMQException; -import org.apache.qpid.framing.BasicPublishBody; import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.abstraction.MessagePublishInfo; @@ -151,7 +150,7 @@ public class AMQQueueMBeanTest extends TestCase AMQMessage msg = message(false); long id = msg.getMessageId(); _queue.clearQueue(_storeContext); - _queue.process(_storeContext, msg); + _queue.process(_storeContext, msg, false); msg.routingComplete(_messageStore, _storeContext, new MessageHandleFactory()); _queueMBean.viewMessageContent(id); try @@ -216,7 +215,7 @@ public class AMQQueueMBeanTest extends TestCase } for (int i = 0; i < messageCount; i++) { - _queue.process(_storeContext, messages[i]); + _queue.process(_storeContext, messages[i], false); } for (int i = 0; i < messages.length; i++) diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/ConcurrencyTest.java b/java/systests/src/main/java/org/apache/qpid/server/queue/ConcurrencyTest.java index 6f3d42d090..4971db2d28 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/queue/ConcurrencyTest.java +++ b/java/systests/src/main/java/org/apache/qpid/server/queue/ConcurrencyTest.java @@ -194,7 +194,7 @@ public class ConcurrencyTest extends MessageTestHelper AMQMessage msg = nextMessage(); if (msg != null) { - _deliveryMgr.deliver(null, new AMQShortString(toString()), msg); + _deliveryMgr.deliver(null, new AMQShortString(toString()), msg, false); } } } diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/DeliveryManagerTest.java b/java/systests/src/main/java/org/apache/qpid/server/queue/DeliveryManagerTest.java index e1be640c8e..dc5a6d3cf6 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/queue/DeliveryManagerTest.java +++ b/java/systests/src/main/java/org/apache/qpid/server/queue/DeliveryManagerTest.java @@ -49,7 +49,7 @@ abstract public class DeliveryManagerTest extends MessageTestHelper for (int i = 0; i < batch; i++) { - _mgr.deliver(_storeContext, DEFAULT_QUEUE_NAME, messages[i]); + _mgr.deliver(_storeContext, DEFAULT_QUEUE_NAME, messages[i], false); } SubscriptionTestHelper s1 = new SubscriptionTestHelper("1"); @@ -59,7 +59,7 @@ abstract public class DeliveryManagerTest extends MessageTestHelper for (int i = batch; i < messages.length; i++) { - _mgr.deliver(_storeContext, DEFAULT_QUEUE_NAME, messages[i]); + _mgr.deliver(_storeContext, DEFAULT_QUEUE_NAME, messages[i], false); } assertTrue(s1.getMessages().isEmpty()); @@ -97,7 +97,7 @@ abstract public class DeliveryManagerTest extends MessageTestHelper for (int i = 0; i < batch; i++) { - _mgr.deliver(_storeContext, DEFAULT_QUEUE_NAME, messages[i]); + _mgr.deliver(_storeContext, DEFAULT_QUEUE_NAME, messages[i], false); } assertEquals(batch, s1.getMessages().size()); @@ -111,7 +111,7 @@ abstract public class DeliveryManagerTest extends MessageTestHelper s1.setSuspended(true); for (int i = batch; i < messages.length; i++) { - _mgr.deliver(_storeContext, DEFAULT_QUEUE_NAME, messages[i]); + _mgr.deliver(_storeContext, DEFAULT_QUEUE_NAME, messages[i], false); } _mgr.processAsync(new OnCurrentThreadExecutor()); @@ -133,7 +133,7 @@ abstract public class DeliveryManagerTest extends MessageTestHelper try { AMQMessage msg = message(true); - _mgr.deliver(_storeContext, DEFAULT_QUEUE_NAME, msg); + _mgr.deliver(_storeContext, DEFAULT_QUEUE_NAME, msg, false); msg.checkDeliveredToConsumer(); fail("expected exception did not occur"); } @@ -155,7 +155,7 @@ abstract public class DeliveryManagerTest extends MessageTestHelper _subscriptions.addSubscriber(s); s.setSuspended(true); AMQMessage msg = message(true); - _mgr.deliver(_storeContext, DEFAULT_QUEUE_NAME, msg); + _mgr.deliver(_storeContext, DEFAULT_QUEUE_NAME, msg, false); msg.checkDeliveredToConsumer(); fail("expected exception did not occur"); } diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java b/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java index b3574ecba4..01eb2ba6a2 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java +++ b/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java @@ -67,12 +67,22 @@ public class SubscriptionTestHelper implements Subscription return isSuspended; } - public boolean wouldSuspend(AMQMessage msg) + public boolean wouldSuspend(AMQMessage msg) { return isSuspended; } - + public void addToResendQueue(AMQMessage msg) + { + //no-op + } + + public Object getSendLock() + { + return new Object(); + } + + public void queueDeleted(AMQQueue queue) { } @@ -92,7 +102,17 @@ public class SubscriptionTestHelper implements Subscription return null; } - public void enqueueForPreDelivery(AMQMessage msg) + public Queue<AMQMessage> getResendQueue() + { + return null; + } + + public Queue<AMQMessage> getNextQueue(Queue<AMQMessage> messages) + { + return messages; + } + + public void enqueueForPreDelivery(AMQMessage msg, boolean deliverFirst) { //no-op } @@ -107,9 +127,14 @@ public class SubscriptionTestHelper implements Subscription //no-op } + public boolean isClosed() + { + return false; + } + public boolean isBrowser() { - return false; //To change body of implemented methods use File | Settings | File Templates. + return false; } public int hashCode() |
