diff options
Diffstat (limited to 'java/broker/src')
| -rw-r--r-- | java/broker/src/org/apache/qpid/server/AMQChannel.java | 90 |
1 files changed, 63 insertions, 27 deletions
diff --git a/java/broker/src/org/apache/qpid/server/AMQChannel.java b/java/broker/src/org/apache/qpid/server/AMQChannel.java index 055af16159..d5cd21486f 100644 --- a/java/broker/src/org/apache/qpid/server/AMQChannel.java +++ b/java/broker/src/org/apache/qpid/server/AMQChannel.java @@ -43,7 +43,6 @@ import java.util.LinkedHashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.NoSuchElementException; import java.util.TreeMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @@ -58,7 +57,9 @@ public class AMQChannel private boolean _transactional; - private long _prefetchCount; + private long _prefetch_HighWaterMark; + + private long _prefetch_LowWaterMark; /** * The delivery tag is unique per channel. This is pre-incremented before putting into the deliver frame so that @@ -112,7 +113,8 @@ public class AMQChannel throws AMQException { _channelId = channelId; - _prefetchCount = DEFAULT_PREFETCH; + _prefetch_HighWaterMark = DEFAULT_PREFETCH; + _prefetch_LowWaterMark = _prefetch_HighWaterMark / 2; _messageStore = messageStore; _exchanges = exchanges; _txnBuffer = new TxnBuffer(_messageStore); @@ -135,14 +137,35 @@ public class AMQChannel public long getPrefetchCount() { - return _prefetchCount; + return _prefetch_HighWaterMark; } public void setPrefetchCount(long prefetchCount) { - _prefetchCount = prefetchCount; + _prefetch_HighWaterMark = prefetchCount; + } + + public long getPrefetchLowMarkCount() + { + return _prefetch_LowWaterMark; + } + + public void setPrefetchLowMarkCount(long prefetchCount) + { + _prefetch_LowWaterMark = prefetchCount; + } + + public long getPrefetchHighMarkCount() + { + return _prefetch_HighWaterMark; + } + + public void setPrefetchHighMarkCount(long prefetchCount) + { + _prefetch_HighWaterMark = prefetchCount; } + public void setPublishFrame(BasicPublishBody publishBody, AMQProtocolSession publisher) throws AMQException { _currentMessage = new AMQMessage(_messageStore, publishBody); @@ -191,7 +214,7 @@ public class AMQChannel if (_transactional) { //don't create a transaction unless needed - if(_currentMessage.isPersistent()) + if (_currentMessage.isPersistent()) { _txnBuffer.containsPersistentChanges(); } @@ -210,7 +233,7 @@ public class AMQChannel _exchanges.routeContent(_currentMessage); _txnBuffer.enlist(new Cleanup(_currentMessage)); } - catch(RequiredDeliveryException e) + catch (RequiredDeliveryException e) { //Can only be due to the mandatory flag, as no attempt //has yet been made to deliver the message. The @@ -236,7 +259,7 @@ public class AMQChannel _currentMessage.checkDeliveredToConsumer(); } finally - { + { _currentMessage.decrementReference(); _currentMessage = null; } @@ -430,20 +453,20 @@ public class AMQChannel if (_transactional) { //check that the tag exists to give early failure - if(!multiple || deliveryTag > 0) + if (!multiple || deliveryTag > 0) { checkAck(deliveryTag); } //we use a single txn op for all acks and update this op //as new acks come in. If this is the first ack in the txn //we will need to create and enlist the op. - if(ackOp == null) + if (ackOp == null) { ackOp = new TxAck(new AckMap()); _txnBuffer.enlist(ackOp); } //update the op to include this ack request - if(multiple && deliveryTag == 0) + if (multiple && deliveryTag == 0) { synchronized(_unacknowledgedMessageMapLock) { @@ -471,7 +494,7 @@ public class AMQChannel { throw new AMQException("Ack with delivery tag " + deliveryTag + " not known for channel"); } - } + } } private void handleAcknowledgement(long deliveryTag, boolean multiple) throws AMQException @@ -484,7 +507,7 @@ public class AMQChannel if (deliveryTag == 0) { //Spec 2.1.6.11 ... If the multiple field is 1, and the delivery tag is zero, tells the server to acknowledge all outstanding mesages. - _log.info("Multiple ack on delivery tag 0. ACKing all messages. Current count:" + _unacknowledgedMessageMap.size()); + _log.trace("Multiple ack on delivery tag 0. ACKing all messages. Current count:" + _unacknowledgedMessageMap.size()); acked = new LinkedList<UnacknowledgedMessage>(_unacknowledgedMessageMap.values()); _unacknowledgedMessageMap.clear(); } @@ -518,9 +541,9 @@ public class AMQChannel } }// synchronized - if (_log.isDebugEnabled()) + if (_log.isTraceEnabled()) { - _log.debug("Received multiple ack for delivery tag " + deliveryTag + ". Removing " + + _log.trace("Received multiple ack for delivery tag " + deliveryTag + ". Removing " + acked.size() + " items."); } @@ -540,13 +563,13 @@ public class AMQChannel if (msg == null) { - _log.info("Single ack on delivery tag " + deliveryTag + " not known for channel:" + _channelId); + _log.trace("Single ack on delivery tag " + deliveryTag + " not known for channel:" + _channelId); throw new AMQException("Single ack on delivery tag " + deliveryTag + " not known for channel:" + _channelId); } msg.discard(); - if (_log.isDebugEnabled()) + if (_log.isTraceEnabled()) { - _log.debug("Received non-multiple ack for messaging with delivery tag " + deliveryTag); + _log.trace("Received non-multiple ack for messaging with delivery tag " + deliveryTag); } } @@ -569,13 +592,24 @@ public class AMQChannel //noinspection SynchronizeOnNonFinalField synchronized(_unacknowledgedMessageMapLock) { - suspend = _unacknowledgedMessageMap.size() >= _prefetchCount; + suspend = _unacknowledgedMessageMap.size() >= _prefetch_HighWaterMark; } setSuspended(suspend); } public void setSuspended(boolean suspended) { + boolean isSuspended = _suspended.get(); + + if (isSuspended && !suspended) + { + synchronized(_unacknowledgedMessageMapLock) + { + // Continue being suspended if we are above the _prefetch_LowWaterMark + suspended = _unacknowledgedMessageMap.size() > _prefetch_LowWaterMark; + } + } + boolean wasSuspended = _suspended.getAndSet(suspended); if (wasSuspended != suspended) { @@ -602,16 +636,16 @@ public class AMQChannel public void commit() throws AMQException { - if(ackOp != null) + if (ackOp != null) { ackOp.consolidate(); - if(ackOp.checkPersistent()) + if (ackOp.checkPersistent()) { - _txnBuffer.containsPersistentChanges(); + _txnBuffer.containsPersistentChanges(); } ackOp = null;//already enlisted, after commit will reset regardless of outcome } - + _txnBuffer.commit(); //TODO: may need to return 'immediate' messages at this point } @@ -629,7 +663,8 @@ public class AMQChannel { StringBuilder sb = new StringBuilder(30); sb.append("Channel: id ").append(_channelId).append(", transaction mode: ").append(_transactional); - sb.append(", prefetch count: ").append(_prefetchCount); + sb.append(", prefetch marks: ").append(_prefetch_LowWaterMark); + sb.append("/").append(_prefetch_HighWaterMark); return sb.toString(); } @@ -654,7 +689,7 @@ public class AMQChannel public void processReturns(AMQProtocolSession session) { - for(AMQDataBlock block : _returns) + for (AMQDataBlock block : _returns) { session.writeFrame(block); } @@ -669,6 +704,7 @@ public class AMQChannel { impl().collect(deliveryTag, multiple, msgs); } + public void remove(List<UnacknowledgedMessage> msgs) { impl().remove(msgs); @@ -742,7 +778,7 @@ public class AMQChannel { _msg.decrementReference(); } - catch(AMQException e) + catch (AMQException e) { _log.error("On commiting transaction, failed to cleanup unused message: " + e, e); } @@ -750,7 +786,7 @@ public class AMQChannel { _msg.checkDeliveredToConsumer(); } - catch(NoConsumersException e) + catch (NoConsumersException e) { //TODO: store this for delivery after the commit-ok _returns.add(e.getReturnMessage(_channelId)); |
