diff options
| author | Martin Ritchie <ritchiem@apache.org> | 2006-10-24 11:30:52 +0000 |
|---|---|---|
| committer | Martin Ritchie <ritchiem@apache.org> | 2006-10-24 11:30:52 +0000 |
| commit | c58c6dedd5f681f9b06b7bdd3a50dc671f995d93 (patch) | |
| tree | 288c315cdde5655e1c02dfbae694169722b46432 | |
| parent | ed231b97ebaa82e34fc8ee720964b79d301bc8a4 (diff) | |
| download | qpid-python-c58c6dedd5f681f9b06b7bdd3a50dc671f995d93.tar.gz | |
QPID-44
Added High and Low water marking to AMQChannel.java.
Currently the low water mark defaults to half the High value.
Test for high and low water marks.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@467310 13f79535-47bb-0310-9956-ffa450edef68
| -rw-r--r-- | java/broker/src/org/apache/qpid/server/AMQChannel.java | 90 | ||||
| -rw-r--r-- | java/broker/test/src/org/apache/qpid/server/queue/AckTest.java | 99 |
2 files changed, 151 insertions, 38 deletions
diff --git a/java/broker/src/org/apache/qpid/server/AMQChannel.java b/java/broker/src/org/apache/qpid/server/AMQChannel.java index 055af16159..d5cd21486f 100644 --- a/java/broker/src/org/apache/qpid/server/AMQChannel.java +++ b/java/broker/src/org/apache/qpid/server/AMQChannel.java @@ -43,7 +43,6 @@ import java.util.LinkedHashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.NoSuchElementException; import java.util.TreeMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @@ -58,7 +57,9 @@ public class AMQChannel private boolean _transactional; - private long _prefetchCount; + private long _prefetch_HighWaterMark; + + private long _prefetch_LowWaterMark; /** * The delivery tag is unique per channel. This is pre-incremented before putting into the deliver frame so that @@ -112,7 +113,8 @@ public class AMQChannel throws AMQException { _channelId = channelId; - _prefetchCount = DEFAULT_PREFETCH; + _prefetch_HighWaterMark = DEFAULT_PREFETCH; + _prefetch_LowWaterMark = _prefetch_HighWaterMark / 2; _messageStore = messageStore; _exchanges = exchanges; _txnBuffer = new TxnBuffer(_messageStore); @@ -135,14 +137,35 @@ public class AMQChannel public long getPrefetchCount() { - return _prefetchCount; + return _prefetch_HighWaterMark; } public void setPrefetchCount(long prefetchCount) { - _prefetchCount = prefetchCount; + _prefetch_HighWaterMark = prefetchCount; + } + + public long getPrefetchLowMarkCount() + { + return _prefetch_LowWaterMark; + } + + public void setPrefetchLowMarkCount(long prefetchCount) + { + _prefetch_LowWaterMark = prefetchCount; + } + + public long getPrefetchHighMarkCount() + { + return _prefetch_HighWaterMark; + } + + public void setPrefetchHighMarkCount(long prefetchCount) + { + _prefetch_HighWaterMark = prefetchCount; } + public void setPublishFrame(BasicPublishBody publishBody, AMQProtocolSession publisher) throws AMQException { _currentMessage = new AMQMessage(_messageStore, publishBody); @@ -191,7 +214,7 @@ public class AMQChannel if (_transactional) { //don't create a transaction unless needed - if(_currentMessage.isPersistent()) + if (_currentMessage.isPersistent()) { _txnBuffer.containsPersistentChanges(); } @@ -210,7 +233,7 @@ public class AMQChannel _exchanges.routeContent(_currentMessage); _txnBuffer.enlist(new Cleanup(_currentMessage)); } - catch(RequiredDeliveryException e) + catch (RequiredDeliveryException e) { //Can only be due to the mandatory flag, as no attempt //has yet been made to deliver the message. The @@ -236,7 +259,7 @@ public class AMQChannel _currentMessage.checkDeliveredToConsumer(); } finally - { + { _currentMessage.decrementReference(); _currentMessage = null; } @@ -430,20 +453,20 @@ public class AMQChannel if (_transactional) { //check that the tag exists to give early failure - if(!multiple || deliveryTag > 0) + if (!multiple || deliveryTag > 0) { checkAck(deliveryTag); } //we use a single txn op for all acks and update this op //as new acks come in. If this is the first ack in the txn //we will need to create and enlist the op. - if(ackOp == null) + if (ackOp == null) { ackOp = new TxAck(new AckMap()); _txnBuffer.enlist(ackOp); } //update the op to include this ack request - if(multiple && deliveryTag == 0) + if (multiple && deliveryTag == 0) { synchronized(_unacknowledgedMessageMapLock) { @@ -471,7 +494,7 @@ public class AMQChannel { throw new AMQException("Ack with delivery tag " + deliveryTag + " not known for channel"); } - } + } } private void handleAcknowledgement(long deliveryTag, boolean multiple) throws AMQException @@ -484,7 +507,7 @@ public class AMQChannel if (deliveryTag == 0) { //Spec 2.1.6.11 ... If the multiple field is 1, and the delivery tag is zero, tells the server to acknowledge all outstanding mesages. - _log.info("Multiple ack on delivery tag 0. ACKing all messages. Current count:" + _unacknowledgedMessageMap.size()); + _log.trace("Multiple ack on delivery tag 0. ACKing all messages. Current count:" + _unacknowledgedMessageMap.size()); acked = new LinkedList<UnacknowledgedMessage>(_unacknowledgedMessageMap.values()); _unacknowledgedMessageMap.clear(); } @@ -518,9 +541,9 @@ public class AMQChannel } }// synchronized - if (_log.isDebugEnabled()) + if (_log.isTraceEnabled()) { - _log.debug("Received multiple ack for delivery tag " + deliveryTag + ". Removing " + + _log.trace("Received multiple ack for delivery tag " + deliveryTag + ". Removing " + acked.size() + " items."); } @@ -540,13 +563,13 @@ public class AMQChannel if (msg == null) { - _log.info("Single ack on delivery tag " + deliveryTag + " not known for channel:" + _channelId); + _log.trace("Single ack on delivery tag " + deliveryTag + " not known for channel:" + _channelId); throw new AMQException("Single ack on delivery tag " + deliveryTag + " not known for channel:" + _channelId); } msg.discard(); - if (_log.isDebugEnabled()) + if (_log.isTraceEnabled()) { - _log.debug("Received non-multiple ack for messaging with delivery tag " + deliveryTag); + _log.trace("Received non-multiple ack for messaging with delivery tag " + deliveryTag); } } @@ -569,13 +592,24 @@ public class AMQChannel //noinspection SynchronizeOnNonFinalField synchronized(_unacknowledgedMessageMapLock) { - suspend = _unacknowledgedMessageMap.size() >= _prefetchCount; + suspend = _unacknowledgedMessageMap.size() >= _prefetch_HighWaterMark; } setSuspended(suspend); } public void setSuspended(boolean suspended) { + boolean isSuspended = _suspended.get(); + + if (isSuspended && !suspended) + { + synchronized(_unacknowledgedMessageMapLock) + { + // Continue being suspended if we are above the _prefetch_LowWaterMark + suspended = _unacknowledgedMessageMap.size() > _prefetch_LowWaterMark; + } + } + boolean wasSuspended = _suspended.getAndSet(suspended); if (wasSuspended != suspended) { @@ -602,16 +636,16 @@ public class AMQChannel public void commit() throws AMQException { - if(ackOp != null) + if (ackOp != null) { ackOp.consolidate(); - if(ackOp.checkPersistent()) + if (ackOp.checkPersistent()) { - _txnBuffer.containsPersistentChanges(); + _txnBuffer.containsPersistentChanges(); } ackOp = null;//already enlisted, after commit will reset regardless of outcome } - + _txnBuffer.commit(); //TODO: may need to return 'immediate' messages at this point } @@ -629,7 +663,8 @@ public class AMQChannel { StringBuilder sb = new StringBuilder(30); sb.append("Channel: id ").append(_channelId).append(", transaction mode: ").append(_transactional); - sb.append(", prefetch count: ").append(_prefetchCount); + sb.append(", prefetch marks: ").append(_prefetch_LowWaterMark); + sb.append("/").append(_prefetch_HighWaterMark); return sb.toString(); } @@ -654,7 +689,7 @@ public class AMQChannel public void processReturns(AMQProtocolSession session) { - for(AMQDataBlock block : _returns) + for (AMQDataBlock block : _returns) { session.writeFrame(block); } @@ -669,6 +704,7 @@ public class AMQChannel { impl().collect(deliveryTag, multiple, msgs); } + public void remove(List<UnacknowledgedMessage> msgs) { impl().remove(msgs); @@ -742,7 +778,7 @@ public class AMQChannel { _msg.decrementReference(); } - catch(AMQException e) + catch (AMQException e) { _log.error("On commiting transaction, failed to cleanup unused message: " + e, e); } @@ -750,7 +786,7 @@ public class AMQChannel { _msg.checkDeliveredToConsumer(); } - catch(NoConsumersException e) + catch (NoConsumersException e) { //TODO: store this for delivery after the commit-ok _returns.add(e.getReturnMessage(_channelId)); diff --git a/java/broker/test/src/org/apache/qpid/server/queue/AckTest.java b/java/broker/test/src/org/apache/qpid/server/queue/AckTest.java index f22ccb8caf..4495ac555b 100644 --- a/java/broker/test/src/org/apache/qpid/server/queue/AckTest.java +++ b/java/broker/test/src/org/apache/qpid/server/queue/AckTest.java @@ -18,11 +18,9 @@ package org.apache.qpid.server.queue; import junit.framework.JUnit4TestAdapter; -import static org.junit.Assert.assertTrue; -import org.junit.Before; -import org.junit.Test; -import org.junit.Ignore; +import org.apache.log4j.Logger; import org.apache.qpid.AMQException; +import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.framing.BasicPublishBody; import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.server.AMQChannel; @@ -30,7 +28,9 @@ import org.apache.qpid.server.ack.UnacknowledgedMessage; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.store.TestableMemoryMessageStore; import org.apache.qpid.server.util.TestApplicationRegistry; -import org.apache.log4j.Logger; +import static org.junit.Assert.assertTrue; +import org.junit.Before; +import org.junit.Test; import java.util.Iterator; import java.util.Map; @@ -72,13 +72,30 @@ public class AckTest private void publishMessages(int count) throws AMQException { + publishMessages(count, false); + } + + private void publishMessages(int count, boolean persistent) throws AMQException + { for (int i = 1; i <= count; i++) { BasicPublishBody publishBody = new BasicPublishBody(); publishBody.routingKey = "rk"; publishBody.exchange = "someExchange"; AMQMessage msg = new AMQMessage(_messageStore, publishBody); - msg.setContentHeaderBody(new ContentHeaderBody()); + if (persistent) + { + BasicContentHeaderProperties b = new BasicContentHeaderProperties(); + //This is DeliveryMode.PERSISTENT + b.setDeliveryMode((byte) 2); + ContentHeaderBody cb = new ContentHeaderBody(); + cb.properties = b; + msg.setContentHeaderBody(cb); + } + else + { + msg.setContentHeaderBody(new ContentHeaderBody()); + } _subscription.send(msg, _queue); } } @@ -87,15 +104,16 @@ public class AckTest * Tests that the acknowledgements are correctly associated with a channel and * order is preserved when acks are enabled */ - @Test @Ignore /* FIXME: broken at the moment */ + @Test public void ackChannelAssociationTest() throws AMQException { _subscription = new SubscriptionImpl(5, _protocolSession, "conTag", true); final int msgCount = 10; - publishMessages(msgCount); + publishMessages(msgCount, true); Map<Long, UnacknowledgedMessage> map = _channel.getUnacknowledgedMessageMap(); assertTrue(map.size() == msgCount); + assertTrue(_messageStore.getMessageMap().size() == msgCount); Iterator<Map.Entry<Long, UnacknowledgedMessage>> it = map.entrySet().iterator(); for (int i = 1; i <= map.size(); i++) @@ -105,6 +123,8 @@ public class AckTest UnacknowledgedMessage unackedMsg = entry.getValue(); assertTrue(unackedMsg.queue == _queue); } + + assertTrue(map.size() == msgCount); assertTrue(_messageStore.getMessageMap().size() == msgCount); } @@ -182,9 +202,8 @@ public class AckTest } } - /** + /** * Tests that a multiple acknowledgement is handled correctly. When ack'ing all pending msgs. - * */ @Test public void multiAckAllReceivedTest() throws AMQException @@ -210,17 +229,74 @@ public class AckTest } + @Test + public void testPrefetchHighLow() throws AMQException + { + int lowMark = 5; + int highMark = 10; + + _subscription = new SubscriptionImpl(5, _protocolSession, "conTag", true); + _channel.setPrefetchLowMarkCount(lowMark); + _channel.setPrefetchHighMarkCount(highMark); + + assertTrue(_channel.getPrefetchLowMarkCount() == lowMark); + assertTrue(_channel.getPrefetchHighMarkCount() == highMark); + + publishMessages(highMark); + + // at this point we should have sent out only highMark messages + // which have not bee received so will be queued up in the channel + // which should be suspended + assertTrue(_subscription.isSuspended()); + Map<Long, AMQChannel.UnacknowledgedMessage> map = _channel.getUnacknowledgedMessageMap(); + assertTrue(map.size() == highMark); + + //acknowledge messages so we are just above lowMark + _channel.acknowledgeMessage(lowMark - 1, true); + + //we should still be suspended + assertTrue(_subscription.isSuspended()); + assertTrue(map.size() == lowMark + 1); + + //acknowledge one more message + _channel.acknowledgeMessage(lowMark, true); + + //and suspension should be lifted + assertTrue(!_subscription.isSuspended()); + + //pubilsh more msgs so we are just below the limit + publishMessages(lowMark - 1); + + //we should not be suspended + assertTrue(!_subscription.isSuspended()); + + //acknowledge all messages + _channel.acknowledgeMessage(0, true); + try + { + Thread.sleep(3000); + } + catch (InterruptedException e) + { + _log.error("Error: " + e, e); + } + //map will be empty + assertTrue(map.size() == 0); + } @Test public void testPrefetch() throws AMQException { _subscription = new SubscriptionImpl(5, _protocolSession, "conTag", true); _channel.setPrefetchCount(5); + + assertTrue(_channel.getPrefetchCount() == 5); + final int msgCount = 5; publishMessages(msgCount); // at this point we should have sent out only 5 messages with a further 5 queued - // up in the channel which should be suspended + // up in the channel which should now be suspended assertTrue(_subscription.isSuspended()); Map<Long, UnacknowledgedMessage> map = _channel.getUnacknowledgedMessageMap(); assertTrue(map.size() == 5); @@ -237,6 +313,7 @@ public class AckTest assertTrue(map.size() == 0); } + public static junit.framework.Test suite() { return new JUnit4TestAdapter(AckTest.class); |
