summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2006-10-24 11:30:52 +0000
committerMartin Ritchie <ritchiem@apache.org>2006-10-24 11:30:52 +0000
commitc58c6dedd5f681f9b06b7bdd3a50dc671f995d93 (patch)
tree288c315cdde5655e1c02dfbae694169722b46432
parented231b97ebaa82e34fc8ee720964b79d301bc8a4 (diff)
downloadqpid-python-c58c6dedd5f681f9b06b7bdd3a50dc671f995d93.tar.gz
QPID-44
Added High and Low water marking to AMQChannel.java. Currently the low water mark defaults to half the High value. Test for high and low water marks. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@467310 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/broker/src/org/apache/qpid/server/AMQChannel.java90
-rw-r--r--java/broker/test/src/org/apache/qpid/server/queue/AckTest.java99
2 files changed, 151 insertions, 38 deletions
diff --git a/java/broker/src/org/apache/qpid/server/AMQChannel.java b/java/broker/src/org/apache/qpid/server/AMQChannel.java
index 055af16159..d5cd21486f 100644
--- a/java/broker/src/org/apache/qpid/server/AMQChannel.java
+++ b/java/broker/src/org/apache/qpid/server/AMQChannel.java
@@ -43,7 +43,6 @@ import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-import java.util.NoSuchElementException;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
@@ -58,7 +57,9 @@ public class AMQChannel
private boolean _transactional;
- private long _prefetchCount;
+ private long _prefetch_HighWaterMark;
+
+ private long _prefetch_LowWaterMark;
/**
* The delivery tag is unique per channel. This is pre-incremented before putting into the deliver frame so that
@@ -112,7 +113,8 @@ public class AMQChannel
throws AMQException
{
_channelId = channelId;
- _prefetchCount = DEFAULT_PREFETCH;
+ _prefetch_HighWaterMark = DEFAULT_PREFETCH;
+ _prefetch_LowWaterMark = _prefetch_HighWaterMark / 2;
_messageStore = messageStore;
_exchanges = exchanges;
_txnBuffer = new TxnBuffer(_messageStore);
@@ -135,14 +137,35 @@ public class AMQChannel
public long getPrefetchCount()
{
- return _prefetchCount;
+ return _prefetch_HighWaterMark;
}
public void setPrefetchCount(long prefetchCount)
{
- _prefetchCount = prefetchCount;
+ _prefetch_HighWaterMark = prefetchCount;
+ }
+
+ public long getPrefetchLowMarkCount()
+ {
+ return _prefetch_LowWaterMark;
+ }
+
+ public void setPrefetchLowMarkCount(long prefetchCount)
+ {
+ _prefetch_LowWaterMark = prefetchCount;
+ }
+
+ public long getPrefetchHighMarkCount()
+ {
+ return _prefetch_HighWaterMark;
+ }
+
+ public void setPrefetchHighMarkCount(long prefetchCount)
+ {
+ _prefetch_HighWaterMark = prefetchCount;
}
+
public void setPublishFrame(BasicPublishBody publishBody, AMQProtocolSession publisher) throws AMQException
{
_currentMessage = new AMQMessage(_messageStore, publishBody);
@@ -191,7 +214,7 @@ public class AMQChannel
if (_transactional)
{
//don't create a transaction unless needed
- if(_currentMessage.isPersistent())
+ if (_currentMessage.isPersistent())
{
_txnBuffer.containsPersistentChanges();
}
@@ -210,7 +233,7 @@ public class AMQChannel
_exchanges.routeContent(_currentMessage);
_txnBuffer.enlist(new Cleanup(_currentMessage));
}
- catch(RequiredDeliveryException e)
+ catch (RequiredDeliveryException e)
{
//Can only be due to the mandatory flag, as no attempt
//has yet been made to deliver the message. The
@@ -236,7 +259,7 @@ public class AMQChannel
_currentMessage.checkDeliveredToConsumer();
}
finally
- {
+ {
_currentMessage.decrementReference();
_currentMessage = null;
}
@@ -430,20 +453,20 @@ public class AMQChannel
if (_transactional)
{
//check that the tag exists to give early failure
- if(!multiple || deliveryTag > 0)
+ if (!multiple || deliveryTag > 0)
{
checkAck(deliveryTag);
}
//we use a single txn op for all acks and update this op
//as new acks come in. If this is the first ack in the txn
//we will need to create and enlist the op.
- if(ackOp == null)
+ if (ackOp == null)
{
ackOp = new TxAck(new AckMap());
_txnBuffer.enlist(ackOp);
}
//update the op to include this ack request
- if(multiple && deliveryTag == 0)
+ if (multiple && deliveryTag == 0)
{
synchronized(_unacknowledgedMessageMapLock)
{
@@ -471,7 +494,7 @@ public class AMQChannel
{
throw new AMQException("Ack with delivery tag " + deliveryTag + " not known for channel");
}
- }
+ }
}
private void handleAcknowledgement(long deliveryTag, boolean multiple) throws AMQException
@@ -484,7 +507,7 @@ public class AMQChannel
if (deliveryTag == 0)
{
//Spec 2.1.6.11 ... If the multiple field is 1, and the delivery tag is zero, tells the server to acknowledge all outstanding mesages.
- _log.info("Multiple ack on delivery tag 0. ACKing all messages. Current count:" + _unacknowledgedMessageMap.size());
+ _log.trace("Multiple ack on delivery tag 0. ACKing all messages. Current count:" + _unacknowledgedMessageMap.size());
acked = new LinkedList<UnacknowledgedMessage>(_unacknowledgedMessageMap.values());
_unacknowledgedMessageMap.clear();
}
@@ -518,9 +541,9 @@ public class AMQChannel
}
}// synchronized
- if (_log.isDebugEnabled())
+ if (_log.isTraceEnabled())
{
- _log.debug("Received multiple ack for delivery tag " + deliveryTag + ". Removing " +
+ _log.trace("Received multiple ack for delivery tag " + deliveryTag + ". Removing " +
acked.size() + " items.");
}
@@ -540,13 +563,13 @@ public class AMQChannel
if (msg == null)
{
- _log.info("Single ack on delivery tag " + deliveryTag + " not known for channel:" + _channelId);
+ _log.trace("Single ack on delivery tag " + deliveryTag + " not known for channel:" + _channelId);
throw new AMQException("Single ack on delivery tag " + deliveryTag + " not known for channel:" + _channelId);
}
msg.discard();
- if (_log.isDebugEnabled())
+ if (_log.isTraceEnabled())
{
- _log.debug("Received non-multiple ack for messaging with delivery tag " + deliveryTag);
+ _log.trace("Received non-multiple ack for messaging with delivery tag " + deliveryTag);
}
}
@@ -569,13 +592,24 @@ public class AMQChannel
//noinspection SynchronizeOnNonFinalField
synchronized(_unacknowledgedMessageMapLock)
{
- suspend = _unacknowledgedMessageMap.size() >= _prefetchCount;
+ suspend = _unacknowledgedMessageMap.size() >= _prefetch_HighWaterMark;
}
setSuspended(suspend);
}
public void setSuspended(boolean suspended)
{
+ boolean isSuspended = _suspended.get();
+
+ if (isSuspended && !suspended)
+ {
+ synchronized(_unacknowledgedMessageMapLock)
+ {
+ // Continue being suspended if we are above the _prefetch_LowWaterMark
+ suspended = _unacknowledgedMessageMap.size() > _prefetch_LowWaterMark;
+ }
+ }
+
boolean wasSuspended = _suspended.getAndSet(suspended);
if (wasSuspended != suspended)
{
@@ -602,16 +636,16 @@ public class AMQChannel
public void commit() throws AMQException
{
- if(ackOp != null)
+ if (ackOp != null)
{
ackOp.consolidate();
- if(ackOp.checkPersistent())
+ if (ackOp.checkPersistent())
{
- _txnBuffer.containsPersistentChanges();
+ _txnBuffer.containsPersistentChanges();
}
ackOp = null;//already enlisted, after commit will reset regardless of outcome
}
-
+
_txnBuffer.commit();
//TODO: may need to return 'immediate' messages at this point
}
@@ -629,7 +663,8 @@ public class AMQChannel
{
StringBuilder sb = new StringBuilder(30);
sb.append("Channel: id ").append(_channelId).append(", transaction mode: ").append(_transactional);
- sb.append(", prefetch count: ").append(_prefetchCount);
+ sb.append(", prefetch marks: ").append(_prefetch_LowWaterMark);
+ sb.append("/").append(_prefetch_HighWaterMark);
return sb.toString();
}
@@ -654,7 +689,7 @@ public class AMQChannel
public void processReturns(AMQProtocolSession session)
{
- for(AMQDataBlock block : _returns)
+ for (AMQDataBlock block : _returns)
{
session.writeFrame(block);
}
@@ -669,6 +704,7 @@ public class AMQChannel
{
impl().collect(deliveryTag, multiple, msgs);
}
+
public void remove(List<UnacknowledgedMessage> msgs)
{
impl().remove(msgs);
@@ -742,7 +778,7 @@ public class AMQChannel
{
_msg.decrementReference();
}
- catch(AMQException e)
+ catch (AMQException e)
{
_log.error("On commiting transaction, failed to cleanup unused message: " + e, e);
}
@@ -750,7 +786,7 @@ public class AMQChannel
{
_msg.checkDeliveredToConsumer();
}
- catch(NoConsumersException e)
+ catch (NoConsumersException e)
{
//TODO: store this for delivery after the commit-ok
_returns.add(e.getReturnMessage(_channelId));
diff --git a/java/broker/test/src/org/apache/qpid/server/queue/AckTest.java b/java/broker/test/src/org/apache/qpid/server/queue/AckTest.java
index f22ccb8caf..4495ac555b 100644
--- a/java/broker/test/src/org/apache/qpid/server/queue/AckTest.java
+++ b/java/broker/test/src/org/apache/qpid/server/queue/AckTest.java
@@ -18,11 +18,9 @@
package org.apache.qpid.server.queue;
import junit.framework.JUnit4TestAdapter;
-import static org.junit.Assert.assertTrue;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.Ignore;
+import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.BasicPublishBody;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.server.AMQChannel;
@@ -30,7 +28,9 @@ import org.apache.qpid.server.ack.UnacknowledgedMessage;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.store.TestableMemoryMessageStore;
import org.apache.qpid.server.util.TestApplicationRegistry;
-import org.apache.log4j.Logger;
+import static org.junit.Assert.assertTrue;
+import org.junit.Before;
+import org.junit.Test;
import java.util.Iterator;
import java.util.Map;
@@ -72,13 +72,30 @@ public class AckTest
private void publishMessages(int count) throws AMQException
{
+ publishMessages(count, false);
+ }
+
+ private void publishMessages(int count, boolean persistent) throws AMQException
+ {
for (int i = 1; i <= count; i++)
{
BasicPublishBody publishBody = new BasicPublishBody();
publishBody.routingKey = "rk";
publishBody.exchange = "someExchange";
AMQMessage msg = new AMQMessage(_messageStore, publishBody);
- msg.setContentHeaderBody(new ContentHeaderBody());
+ if (persistent)
+ {
+ BasicContentHeaderProperties b = new BasicContentHeaderProperties();
+ //This is DeliveryMode.PERSISTENT
+ b.setDeliveryMode((byte) 2);
+ ContentHeaderBody cb = new ContentHeaderBody();
+ cb.properties = b;
+ msg.setContentHeaderBody(cb);
+ }
+ else
+ {
+ msg.setContentHeaderBody(new ContentHeaderBody());
+ }
_subscription.send(msg, _queue);
}
}
@@ -87,15 +104,16 @@ public class AckTest
* Tests that the acknowledgements are correctly associated with a channel and
* order is preserved when acks are enabled
*/
- @Test @Ignore /* FIXME: broken at the moment */
+ @Test
public void ackChannelAssociationTest() throws AMQException
{
_subscription = new SubscriptionImpl(5, _protocolSession, "conTag", true);
final int msgCount = 10;
- publishMessages(msgCount);
+ publishMessages(msgCount, true);
Map<Long, UnacknowledgedMessage> map = _channel.getUnacknowledgedMessageMap();
assertTrue(map.size() == msgCount);
+ assertTrue(_messageStore.getMessageMap().size() == msgCount);
Iterator<Map.Entry<Long, UnacknowledgedMessage>> it = map.entrySet().iterator();
for (int i = 1; i <= map.size(); i++)
@@ -105,6 +123,8 @@ public class AckTest
UnacknowledgedMessage unackedMsg = entry.getValue();
assertTrue(unackedMsg.queue == _queue);
}
+
+ assertTrue(map.size() == msgCount);
assertTrue(_messageStore.getMessageMap().size() == msgCount);
}
@@ -182,9 +202,8 @@ public class AckTest
}
}
- /**
+ /**
* Tests that a multiple acknowledgement is handled correctly. When ack'ing all pending msgs.
- *
*/
@Test
public void multiAckAllReceivedTest() throws AMQException
@@ -210,17 +229,74 @@ public class AckTest
}
+ @Test
+ public void testPrefetchHighLow() throws AMQException
+ {
+ int lowMark = 5;
+ int highMark = 10;
+
+ _subscription = new SubscriptionImpl(5, _protocolSession, "conTag", true);
+ _channel.setPrefetchLowMarkCount(lowMark);
+ _channel.setPrefetchHighMarkCount(highMark);
+
+ assertTrue(_channel.getPrefetchLowMarkCount() == lowMark);
+ assertTrue(_channel.getPrefetchHighMarkCount() == highMark);
+
+ publishMessages(highMark);
+
+ // at this point we should have sent out only highMark messages
+ // which have not bee received so will be queued up in the channel
+ // which should be suspended
+ assertTrue(_subscription.isSuspended());
+ Map<Long, AMQChannel.UnacknowledgedMessage> map = _channel.getUnacknowledgedMessageMap();
+ assertTrue(map.size() == highMark);
+
+ //acknowledge messages so we are just above lowMark
+ _channel.acknowledgeMessage(lowMark - 1, true);
+
+ //we should still be suspended
+ assertTrue(_subscription.isSuspended());
+ assertTrue(map.size() == lowMark + 1);
+
+ //acknowledge one more message
+ _channel.acknowledgeMessage(lowMark, true);
+
+ //and suspension should be lifted
+ assertTrue(!_subscription.isSuspended());
+
+ //pubilsh more msgs so we are just below the limit
+ publishMessages(lowMark - 1);
+
+ //we should not be suspended
+ assertTrue(!_subscription.isSuspended());
+
+ //acknowledge all messages
+ _channel.acknowledgeMessage(0, true);
+ try
+ {
+ Thread.sleep(3000);
+ }
+ catch (InterruptedException e)
+ {
+ _log.error("Error: " + e, e);
+ }
+ //map will be empty
+ assertTrue(map.size() == 0);
+ }
@Test
public void testPrefetch() throws AMQException
{
_subscription = new SubscriptionImpl(5, _protocolSession, "conTag", true);
_channel.setPrefetchCount(5);
+
+ assertTrue(_channel.getPrefetchCount() == 5);
+
final int msgCount = 5;
publishMessages(msgCount);
// at this point we should have sent out only 5 messages with a further 5 queued
- // up in the channel which should be suspended
+ // up in the channel which should now be suspended
assertTrue(_subscription.isSuspended());
Map<Long, UnacknowledgedMessage> map = _channel.getUnacknowledgedMessageMap();
assertTrue(map.size() == 5);
@@ -237,6 +313,7 @@ public class AckTest
assertTrue(map.size() == 0);
}
+
public static junit.framework.Test suite()
{
return new JUnit4TestAdapter(AckTest.class);