summaryrefslogtreecommitdiff
path: root/java/broker/test/src
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2006-10-24 11:30:52 +0000
committerMartin Ritchie <ritchiem@apache.org>2006-10-24 11:30:52 +0000
commitc58c6dedd5f681f9b06b7bdd3a50dc671f995d93 (patch)
tree288c315cdde5655e1c02dfbae694169722b46432 /java/broker/test/src
parented231b97ebaa82e34fc8ee720964b79d301bc8a4 (diff)
downloadqpid-python-c58c6dedd5f681f9b06b7bdd3a50dc671f995d93.tar.gz
QPID-44
Added High and Low water marking to AMQChannel.java. Currently the low water mark defaults to half the High value. Test for high and low water marks. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@467310 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker/test/src')
-rw-r--r--java/broker/test/src/org/apache/qpid/server/queue/AckTest.java99
1 files changed, 88 insertions, 11 deletions
diff --git a/java/broker/test/src/org/apache/qpid/server/queue/AckTest.java b/java/broker/test/src/org/apache/qpid/server/queue/AckTest.java
index f22ccb8caf..4495ac555b 100644
--- a/java/broker/test/src/org/apache/qpid/server/queue/AckTest.java
+++ b/java/broker/test/src/org/apache/qpid/server/queue/AckTest.java
@@ -18,11 +18,9 @@
package org.apache.qpid.server.queue;
import junit.framework.JUnit4TestAdapter;
-import static org.junit.Assert.assertTrue;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.Ignore;
+import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.BasicPublishBody;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.server.AMQChannel;
@@ -30,7 +28,9 @@ import org.apache.qpid.server.ack.UnacknowledgedMessage;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.store.TestableMemoryMessageStore;
import org.apache.qpid.server.util.TestApplicationRegistry;
-import org.apache.log4j.Logger;
+import static org.junit.Assert.assertTrue;
+import org.junit.Before;
+import org.junit.Test;
import java.util.Iterator;
import java.util.Map;
@@ -72,13 +72,30 @@ public class AckTest
private void publishMessages(int count) throws AMQException
{
+ publishMessages(count, false);
+ }
+
+ private void publishMessages(int count, boolean persistent) throws AMQException
+ {
for (int i = 1; i <= count; i++)
{
BasicPublishBody publishBody = new BasicPublishBody();
publishBody.routingKey = "rk";
publishBody.exchange = "someExchange";
AMQMessage msg = new AMQMessage(_messageStore, publishBody);
- msg.setContentHeaderBody(new ContentHeaderBody());
+ if (persistent)
+ {
+ BasicContentHeaderProperties b = new BasicContentHeaderProperties();
+ //This is DeliveryMode.PERSISTENT
+ b.setDeliveryMode((byte) 2);
+ ContentHeaderBody cb = new ContentHeaderBody();
+ cb.properties = b;
+ msg.setContentHeaderBody(cb);
+ }
+ else
+ {
+ msg.setContentHeaderBody(new ContentHeaderBody());
+ }
_subscription.send(msg, _queue);
}
}
@@ -87,15 +104,16 @@ public class AckTest
* Tests that the acknowledgements are correctly associated with a channel and
* order is preserved when acks are enabled
*/
- @Test @Ignore /* FIXME: broken at the moment */
+ @Test
public void ackChannelAssociationTest() throws AMQException
{
_subscription = new SubscriptionImpl(5, _protocolSession, "conTag", true);
final int msgCount = 10;
- publishMessages(msgCount);
+ publishMessages(msgCount, true);
Map<Long, UnacknowledgedMessage> map = _channel.getUnacknowledgedMessageMap();
assertTrue(map.size() == msgCount);
+ assertTrue(_messageStore.getMessageMap().size() == msgCount);
Iterator<Map.Entry<Long, UnacknowledgedMessage>> it = map.entrySet().iterator();
for (int i = 1; i <= map.size(); i++)
@@ -105,6 +123,8 @@ public class AckTest
UnacknowledgedMessage unackedMsg = entry.getValue();
assertTrue(unackedMsg.queue == _queue);
}
+
+ assertTrue(map.size() == msgCount);
assertTrue(_messageStore.getMessageMap().size() == msgCount);
}
@@ -182,9 +202,8 @@ public class AckTest
}
}
- /**
+ /**
* Tests that a multiple acknowledgement is handled correctly. When ack'ing all pending msgs.
- *
*/
@Test
public void multiAckAllReceivedTest() throws AMQException
@@ -210,17 +229,74 @@ public class AckTest
}
+ @Test
+ public void testPrefetchHighLow() throws AMQException
+ {
+ int lowMark = 5;
+ int highMark = 10;
+
+ _subscription = new SubscriptionImpl(5, _protocolSession, "conTag", true);
+ _channel.setPrefetchLowMarkCount(lowMark);
+ _channel.setPrefetchHighMarkCount(highMark);
+
+ assertTrue(_channel.getPrefetchLowMarkCount() == lowMark);
+ assertTrue(_channel.getPrefetchHighMarkCount() == highMark);
+
+ publishMessages(highMark);
+
+ // at this point we should have sent out only highMark messages
+ // which have not bee received so will be queued up in the channel
+ // which should be suspended
+ assertTrue(_subscription.isSuspended());
+ Map<Long, AMQChannel.UnacknowledgedMessage> map = _channel.getUnacknowledgedMessageMap();
+ assertTrue(map.size() == highMark);
+
+ //acknowledge messages so we are just above lowMark
+ _channel.acknowledgeMessage(lowMark - 1, true);
+
+ //we should still be suspended
+ assertTrue(_subscription.isSuspended());
+ assertTrue(map.size() == lowMark + 1);
+
+ //acknowledge one more message
+ _channel.acknowledgeMessage(lowMark, true);
+
+ //and suspension should be lifted
+ assertTrue(!_subscription.isSuspended());
+
+ //pubilsh more msgs so we are just below the limit
+ publishMessages(lowMark - 1);
+
+ //we should not be suspended
+ assertTrue(!_subscription.isSuspended());
+
+ //acknowledge all messages
+ _channel.acknowledgeMessage(0, true);
+ try
+ {
+ Thread.sleep(3000);
+ }
+ catch (InterruptedException e)
+ {
+ _log.error("Error: " + e, e);
+ }
+ //map will be empty
+ assertTrue(map.size() == 0);
+ }
@Test
public void testPrefetch() throws AMQException
{
_subscription = new SubscriptionImpl(5, _protocolSession, "conTag", true);
_channel.setPrefetchCount(5);
+
+ assertTrue(_channel.getPrefetchCount() == 5);
+
final int msgCount = 5;
publishMessages(msgCount);
// at this point we should have sent out only 5 messages with a further 5 queued
- // up in the channel which should be suspended
+ // up in the channel which should now be suspended
assertTrue(_subscription.isSuspended());
Map<Long, UnacknowledgedMessage> map = _channel.getUnacknowledgedMessageMap();
assertTrue(map.size() == 5);
@@ -237,6 +313,7 @@ public class AckTest
assertTrue(map.size() == 0);
}
+
public static junit.framework.Test suite()
{
return new JUnit4TestAdapter(AckTest.class);