diff options
| author | Martin Ritchie <ritchiem@apache.org> | 2006-10-24 11:30:52 +0000 |
|---|---|---|
| committer | Martin Ritchie <ritchiem@apache.org> | 2006-10-24 11:30:52 +0000 |
| commit | c58c6dedd5f681f9b06b7bdd3a50dc671f995d93 (patch) | |
| tree | 288c315cdde5655e1c02dfbae694169722b46432 /java/broker/test/src | |
| parent | ed231b97ebaa82e34fc8ee720964b79d301bc8a4 (diff) | |
| download | qpid-python-c58c6dedd5f681f9b06b7bdd3a50dc671f995d93.tar.gz | |
QPID-44
Added High and Low water marking to AMQChannel.java.
Currently the low water mark defaults to half the High value.
Test for high and low water marks.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@467310 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker/test/src')
| -rw-r--r-- | java/broker/test/src/org/apache/qpid/server/queue/AckTest.java | 99 |
1 files changed, 88 insertions, 11 deletions
diff --git a/java/broker/test/src/org/apache/qpid/server/queue/AckTest.java b/java/broker/test/src/org/apache/qpid/server/queue/AckTest.java index f22ccb8caf..4495ac555b 100644 --- a/java/broker/test/src/org/apache/qpid/server/queue/AckTest.java +++ b/java/broker/test/src/org/apache/qpid/server/queue/AckTest.java @@ -18,11 +18,9 @@ package org.apache.qpid.server.queue; import junit.framework.JUnit4TestAdapter; -import static org.junit.Assert.assertTrue; -import org.junit.Before; -import org.junit.Test; -import org.junit.Ignore; +import org.apache.log4j.Logger; import org.apache.qpid.AMQException; +import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.framing.BasicPublishBody; import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.server.AMQChannel; @@ -30,7 +28,9 @@ import org.apache.qpid.server.ack.UnacknowledgedMessage; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.store.TestableMemoryMessageStore; import org.apache.qpid.server.util.TestApplicationRegistry; -import org.apache.log4j.Logger; +import static org.junit.Assert.assertTrue; +import org.junit.Before; +import org.junit.Test; import java.util.Iterator; import java.util.Map; @@ -72,13 +72,30 @@ public class AckTest private void publishMessages(int count) throws AMQException { + publishMessages(count, false); + } + + private void publishMessages(int count, boolean persistent) throws AMQException + { for (int i = 1; i <= count; i++) { BasicPublishBody publishBody = new BasicPublishBody(); publishBody.routingKey = "rk"; publishBody.exchange = "someExchange"; AMQMessage msg = new AMQMessage(_messageStore, publishBody); - msg.setContentHeaderBody(new ContentHeaderBody()); + if (persistent) + { + BasicContentHeaderProperties b = new BasicContentHeaderProperties(); + //This is DeliveryMode.PERSISTENT + b.setDeliveryMode((byte) 2); + ContentHeaderBody cb = new ContentHeaderBody(); + cb.properties = b; + msg.setContentHeaderBody(cb); + } + else + { + msg.setContentHeaderBody(new ContentHeaderBody()); + } _subscription.send(msg, _queue); } } @@ -87,15 +104,16 @@ public class AckTest * Tests that the acknowledgements are correctly associated with a channel and * order is preserved when acks are enabled */ - @Test @Ignore /* FIXME: broken at the moment */ + @Test public void ackChannelAssociationTest() throws AMQException { _subscription = new SubscriptionImpl(5, _protocolSession, "conTag", true); final int msgCount = 10; - publishMessages(msgCount); + publishMessages(msgCount, true); Map<Long, UnacknowledgedMessage> map = _channel.getUnacknowledgedMessageMap(); assertTrue(map.size() == msgCount); + assertTrue(_messageStore.getMessageMap().size() == msgCount); Iterator<Map.Entry<Long, UnacknowledgedMessage>> it = map.entrySet().iterator(); for (int i = 1; i <= map.size(); i++) @@ -105,6 +123,8 @@ public class AckTest UnacknowledgedMessage unackedMsg = entry.getValue(); assertTrue(unackedMsg.queue == _queue); } + + assertTrue(map.size() == msgCount); assertTrue(_messageStore.getMessageMap().size() == msgCount); } @@ -182,9 +202,8 @@ public class AckTest } } - /** + /** * Tests that a multiple acknowledgement is handled correctly. When ack'ing all pending msgs. - * */ @Test public void multiAckAllReceivedTest() throws AMQException @@ -210,17 +229,74 @@ public class AckTest } + @Test + public void testPrefetchHighLow() throws AMQException + { + int lowMark = 5; + int highMark = 10; + + _subscription = new SubscriptionImpl(5, _protocolSession, "conTag", true); + _channel.setPrefetchLowMarkCount(lowMark); + _channel.setPrefetchHighMarkCount(highMark); + + assertTrue(_channel.getPrefetchLowMarkCount() == lowMark); + assertTrue(_channel.getPrefetchHighMarkCount() == highMark); + + publishMessages(highMark); + + // at this point we should have sent out only highMark messages + // which have not bee received so will be queued up in the channel + // which should be suspended + assertTrue(_subscription.isSuspended()); + Map<Long, AMQChannel.UnacknowledgedMessage> map = _channel.getUnacknowledgedMessageMap(); + assertTrue(map.size() == highMark); + + //acknowledge messages so we are just above lowMark + _channel.acknowledgeMessage(lowMark - 1, true); + + //we should still be suspended + assertTrue(_subscription.isSuspended()); + assertTrue(map.size() == lowMark + 1); + + //acknowledge one more message + _channel.acknowledgeMessage(lowMark, true); + + //and suspension should be lifted + assertTrue(!_subscription.isSuspended()); + + //pubilsh more msgs so we are just below the limit + publishMessages(lowMark - 1); + + //we should not be suspended + assertTrue(!_subscription.isSuspended()); + + //acknowledge all messages + _channel.acknowledgeMessage(0, true); + try + { + Thread.sleep(3000); + } + catch (InterruptedException e) + { + _log.error("Error: " + e, e); + } + //map will be empty + assertTrue(map.size() == 0); + } @Test public void testPrefetch() throws AMQException { _subscription = new SubscriptionImpl(5, _protocolSession, "conTag", true); _channel.setPrefetchCount(5); + + assertTrue(_channel.getPrefetchCount() == 5); + final int msgCount = 5; publishMessages(msgCount); // at this point we should have sent out only 5 messages with a further 5 queued - // up in the channel which should be suspended + // up in the channel which should now be suspended assertTrue(_subscription.isSuspended()); Map<Long, UnacknowledgedMessage> map = _channel.getUnacknowledgedMessageMap(); assertTrue(map.size() == 5); @@ -237,6 +313,7 @@ public class AckTest assertTrue(map.size() == 0); } + public static junit.framework.Test suite() { return new JUnit4TestAdapter(AckTest.class); |
