summaryrefslogtreecommitdiff
path: root/java/systests/src
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2007-02-23 10:20:44 +0000
committerMartin Ritchie <ritchiem@apache.org>2007-02-23 10:20:44 +0000
commitde248153d311b1e0211dfe3230afcb306f3c0192 (patch)
tree30412df8d5fd1d3ef076fba0903301b25f8a7518 /java/systests/src
parentf74e4dc27d1655760d0213fd60cc75c272c26f00 (diff)
downloadqpid-python-de248153d311b1e0211dfe3230afcb306f3c0192.tar.gz
QPID-346 Message loss after rollback
QPID-348 Problems of prefetching messages QPID-355 Closing a consumer does not ensure messages delivery will stop for that subscription BROKER AMQChannel - updated requeue to either resend via the Delivery Manager not directly via msg.writedeliver. BasicRejectMethodHandler - initial place holder. TxRollbackHandler - Added comment AMQMessage - added ability to record who has taken the message so that it can be resent to that subscriber on resend/requeue. AMQQueue - added the queue reference to the Subscription creation ConcurrentSelectorDeliveryManager - Added methods to correctly monitor the size of queue messages. Including messages on the resend queue of a Subscriber. Additional locking to ensure that messages are not sent to the subscriber after Closure. QPID-355 DeliveryManager - adjusted deliver call to allow delivery to the head of the queue. Subscription - changes to allow selction of queue(resend or predelivery) methods to add to resend and getSendLock to ensure that sending to the Subscription is allowed. SubscriptionFactory - changes to allow the AMQQueue to be passed to the Subscription. SubscriptionImpl - implementation of the interfaces. Local storage of messages to be resent and requeuing of the messages during closure. SubscriptionSet - changes to retrieve the actual stored Subscription when performing removeSubscriber. So we have access to the the resend queue. AMQStateManager - Added BasicRejectMethodHandler TransactionalContext - Added option to deliver the messages to the front of the queue. LocalTransactionalContext - cleared the _postComitDeliveryList on rollback. Added option to deliver the messages to the front of the queue. NonTransactionalContext - Added option to deliver the messages to the front of the queue. DeliverMessageOperation.java DELELTED AS NOT USED. CLIENT AMQSession - added ability to get the pervious state of the dispatcher when settting Stopped, fixed the channel suspension problems on broker so uncommented clean up code in rollback and recover. BasicMessageConsumer - updated the rollback so that it sends reject messages to server. AbstractJMSMessage - whitespace + added extra message properties to the toString() AMQProtocolHandler - whitespace + extra debug output TransactedTest - updated expect to prevent NPEs also added extra logging to help understand what is going on. CLUSTER ClusteredQueue - AMQQueue changes for message deliveryFirst. RemoteSubscriptionImpl - Implementation of Subscription SYSTESTS AbstractHeadersExchangeTestBase - AMQQueue changes for message deliveryFirst. AMQQueueMBeanTest - changes for message deliveryFirst. ConcurrencyTest - changes for message deliveryFirst. DeliveryManagerTest - changes for message deliveryFirst. SubscriptionTestHelper - Implementation of Subscription WhiteSpace only UnacknowledgedMessageMapImpl.java git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@510897 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/systests/src')
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java3
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java5
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/queue/ConcurrencyTest.java2
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/queue/DeliveryManagerTest.java12
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java33
5 files changed, 40 insertions, 15 deletions
diff --git a/java/systests/src/main/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java b/java/systests/src/main/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
index 6beeb92053..ccd23bc0bc 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
@@ -272,9 +272,10 @@ public class AbstractHeadersExchangeTestBase extends TestCase
* not invoked. It is unnecessary since for this test we only care to know whether the message was
* sent to the queue; the queue processing logic is not being tested.
* @param msg
+ * @param deliverFirst
* @throws AMQException
*/
- public void process(StoreContext context, AMQMessage msg) throws AMQException
+ public void process(StoreContext context, AMQMessage msg, boolean deliverFirst) throws AMQException
{
messages.add(new HeadersExchangeTest.Message(msg));
}
diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java b/java/systests/src/main/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
index 2d0315d7f5..26332579cb 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
@@ -19,7 +19,6 @@ package org.apache.qpid.server.queue;
import junit.framework.TestCase;
import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.BasicPublishBody;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
@@ -151,7 +150,7 @@ public class AMQQueueMBeanTest extends TestCase
AMQMessage msg = message(false);
long id = msg.getMessageId();
_queue.clearQueue(_storeContext);
- _queue.process(_storeContext, msg);
+ _queue.process(_storeContext, msg, false);
msg.routingComplete(_messageStore, _storeContext, new MessageHandleFactory());
_queueMBean.viewMessageContent(id);
try
@@ -216,7 +215,7 @@ public class AMQQueueMBeanTest extends TestCase
}
for (int i = 0; i < messageCount; i++)
{
- _queue.process(_storeContext, messages[i]);
+ _queue.process(_storeContext, messages[i], false);
}
for (int i = 0; i < messages.length; i++)
diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/ConcurrencyTest.java b/java/systests/src/main/java/org/apache/qpid/server/queue/ConcurrencyTest.java
index 6f3d42d090..4971db2d28 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/queue/ConcurrencyTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/queue/ConcurrencyTest.java
@@ -194,7 +194,7 @@ public class ConcurrencyTest extends MessageTestHelper
AMQMessage msg = nextMessage();
if (msg != null)
{
- _deliveryMgr.deliver(null, new AMQShortString(toString()), msg);
+ _deliveryMgr.deliver(null, new AMQShortString(toString()), msg, false);
}
}
}
diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/DeliveryManagerTest.java b/java/systests/src/main/java/org/apache/qpid/server/queue/DeliveryManagerTest.java
index e1be640c8e..dc5a6d3cf6 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/queue/DeliveryManagerTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/queue/DeliveryManagerTest.java
@@ -49,7 +49,7 @@ abstract public class DeliveryManagerTest extends MessageTestHelper
for (int i = 0; i < batch; i++)
{
- _mgr.deliver(_storeContext, DEFAULT_QUEUE_NAME, messages[i]);
+ _mgr.deliver(_storeContext, DEFAULT_QUEUE_NAME, messages[i], false);
}
SubscriptionTestHelper s1 = new SubscriptionTestHelper("1");
@@ -59,7 +59,7 @@ abstract public class DeliveryManagerTest extends MessageTestHelper
for (int i = batch; i < messages.length; i++)
{
- _mgr.deliver(_storeContext, DEFAULT_QUEUE_NAME, messages[i]);
+ _mgr.deliver(_storeContext, DEFAULT_QUEUE_NAME, messages[i], false);
}
assertTrue(s1.getMessages().isEmpty());
@@ -97,7 +97,7 @@ abstract public class DeliveryManagerTest extends MessageTestHelper
for (int i = 0; i < batch; i++)
{
- _mgr.deliver(_storeContext, DEFAULT_QUEUE_NAME, messages[i]);
+ _mgr.deliver(_storeContext, DEFAULT_QUEUE_NAME, messages[i], false);
}
assertEquals(batch, s1.getMessages().size());
@@ -111,7 +111,7 @@ abstract public class DeliveryManagerTest extends MessageTestHelper
s1.setSuspended(true);
for (int i = batch; i < messages.length; i++)
{
- _mgr.deliver(_storeContext, DEFAULT_QUEUE_NAME, messages[i]);
+ _mgr.deliver(_storeContext, DEFAULT_QUEUE_NAME, messages[i], false);
}
_mgr.processAsync(new OnCurrentThreadExecutor());
@@ -133,7 +133,7 @@ abstract public class DeliveryManagerTest extends MessageTestHelper
try
{
AMQMessage msg = message(true);
- _mgr.deliver(_storeContext, DEFAULT_QUEUE_NAME, msg);
+ _mgr.deliver(_storeContext, DEFAULT_QUEUE_NAME, msg, false);
msg.checkDeliveredToConsumer();
fail("expected exception did not occur");
}
@@ -155,7 +155,7 @@ abstract public class DeliveryManagerTest extends MessageTestHelper
_subscriptions.addSubscriber(s);
s.setSuspended(true);
AMQMessage msg = message(true);
- _mgr.deliver(_storeContext, DEFAULT_QUEUE_NAME, msg);
+ _mgr.deliver(_storeContext, DEFAULT_QUEUE_NAME, msg, false);
msg.checkDeliveredToConsumer();
fail("expected exception did not occur");
}
diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java b/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java
index b3574ecba4..01eb2ba6a2 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java
@@ -67,12 +67,22 @@ public class SubscriptionTestHelper implements Subscription
return isSuspended;
}
- public boolean wouldSuspend(AMQMessage msg)
+ public boolean wouldSuspend(AMQMessage msg)
{
return isSuspended;
}
-
+ public void addToResendQueue(AMQMessage msg)
+ {
+ //no-op
+ }
+
+ public Object getSendLock()
+ {
+ return new Object();
+ }
+
+
public void queueDeleted(AMQQueue queue)
{
}
@@ -92,7 +102,17 @@ public class SubscriptionTestHelper implements Subscription
return null;
}
- public void enqueueForPreDelivery(AMQMessage msg)
+ public Queue<AMQMessage> getResendQueue()
+ {
+ return null;
+ }
+
+ public Queue<AMQMessage> getNextQueue(Queue<AMQMessage> messages)
+ {
+ return messages;
+ }
+
+ public void enqueueForPreDelivery(AMQMessage msg, boolean deliverFirst)
{
//no-op
}
@@ -107,9 +127,14 @@ public class SubscriptionTestHelper implements Subscription
//no-op
}
+ public boolean isClosed()
+ {
+ return false;
+ }
+
public boolean isBrowser()
{
- return false; //To change body of implemented methods use File | Settings | File Templates.
+ return false;
}
public int hashCode()