summaryrefslogtreecommitdiff
path: root/java/broker/src
diff options
context:
space:
mode:
Diffstat (limited to 'java/broker/src')
-rw-r--r--java/broker/src/org/apache/qpid/server/AMQChannel.java90
1 files changed, 63 insertions, 27 deletions
diff --git a/java/broker/src/org/apache/qpid/server/AMQChannel.java b/java/broker/src/org/apache/qpid/server/AMQChannel.java
index 055af16159..d5cd21486f 100644
--- a/java/broker/src/org/apache/qpid/server/AMQChannel.java
+++ b/java/broker/src/org/apache/qpid/server/AMQChannel.java
@@ -43,7 +43,6 @@ import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-import java.util.NoSuchElementException;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
@@ -58,7 +57,9 @@ public class AMQChannel
private boolean _transactional;
- private long _prefetchCount;
+ private long _prefetch_HighWaterMark;
+
+ private long _prefetch_LowWaterMark;
/**
* The delivery tag is unique per channel. This is pre-incremented before putting into the deliver frame so that
@@ -112,7 +113,8 @@ public class AMQChannel
throws AMQException
{
_channelId = channelId;
- _prefetchCount = DEFAULT_PREFETCH;
+ _prefetch_HighWaterMark = DEFAULT_PREFETCH;
+ _prefetch_LowWaterMark = _prefetch_HighWaterMark / 2;
_messageStore = messageStore;
_exchanges = exchanges;
_txnBuffer = new TxnBuffer(_messageStore);
@@ -135,14 +137,35 @@ public class AMQChannel
public long getPrefetchCount()
{
- return _prefetchCount;
+ return _prefetch_HighWaterMark;
}
public void setPrefetchCount(long prefetchCount)
{
- _prefetchCount = prefetchCount;
+ _prefetch_HighWaterMark = prefetchCount;
+ }
+
+ public long getPrefetchLowMarkCount()
+ {
+ return _prefetch_LowWaterMark;
+ }
+
+ public void setPrefetchLowMarkCount(long prefetchCount)
+ {
+ _prefetch_LowWaterMark = prefetchCount;
+ }
+
+ public long getPrefetchHighMarkCount()
+ {
+ return _prefetch_HighWaterMark;
+ }
+
+ public void setPrefetchHighMarkCount(long prefetchCount)
+ {
+ _prefetch_HighWaterMark = prefetchCount;
}
+
public void setPublishFrame(BasicPublishBody publishBody, AMQProtocolSession publisher) throws AMQException
{
_currentMessage = new AMQMessage(_messageStore, publishBody);
@@ -191,7 +214,7 @@ public class AMQChannel
if (_transactional)
{
//don't create a transaction unless needed
- if(_currentMessage.isPersistent())
+ if (_currentMessage.isPersistent())
{
_txnBuffer.containsPersistentChanges();
}
@@ -210,7 +233,7 @@ public class AMQChannel
_exchanges.routeContent(_currentMessage);
_txnBuffer.enlist(new Cleanup(_currentMessage));
}
- catch(RequiredDeliveryException e)
+ catch (RequiredDeliveryException e)
{
//Can only be due to the mandatory flag, as no attempt
//has yet been made to deliver the message. The
@@ -236,7 +259,7 @@ public class AMQChannel
_currentMessage.checkDeliveredToConsumer();
}
finally
- {
+ {
_currentMessage.decrementReference();
_currentMessage = null;
}
@@ -430,20 +453,20 @@ public class AMQChannel
if (_transactional)
{
//check that the tag exists to give early failure
- if(!multiple || deliveryTag > 0)
+ if (!multiple || deliveryTag > 0)
{
checkAck(deliveryTag);
}
//we use a single txn op for all acks and update this op
//as new acks come in. If this is the first ack in the txn
//we will need to create and enlist the op.
- if(ackOp == null)
+ if (ackOp == null)
{
ackOp = new TxAck(new AckMap());
_txnBuffer.enlist(ackOp);
}
//update the op to include this ack request
- if(multiple && deliveryTag == 0)
+ if (multiple && deliveryTag == 0)
{
synchronized(_unacknowledgedMessageMapLock)
{
@@ -471,7 +494,7 @@ public class AMQChannel
{
throw new AMQException("Ack with delivery tag " + deliveryTag + " not known for channel");
}
- }
+ }
}
private void handleAcknowledgement(long deliveryTag, boolean multiple) throws AMQException
@@ -484,7 +507,7 @@ public class AMQChannel
if (deliveryTag == 0)
{
//Spec 2.1.6.11 ... If the multiple field is 1, and the delivery tag is zero, tells the server to acknowledge all outstanding mesages.
- _log.info("Multiple ack on delivery tag 0. ACKing all messages. Current count:" + _unacknowledgedMessageMap.size());
+ _log.trace("Multiple ack on delivery tag 0. ACKing all messages. Current count:" + _unacknowledgedMessageMap.size());
acked = new LinkedList<UnacknowledgedMessage>(_unacknowledgedMessageMap.values());
_unacknowledgedMessageMap.clear();
}
@@ -518,9 +541,9 @@ public class AMQChannel
}
}// synchronized
- if (_log.isDebugEnabled())
+ if (_log.isTraceEnabled())
{
- _log.debug("Received multiple ack for delivery tag " + deliveryTag + ". Removing " +
+ _log.trace("Received multiple ack for delivery tag " + deliveryTag + ". Removing " +
acked.size() + " items.");
}
@@ -540,13 +563,13 @@ public class AMQChannel
if (msg == null)
{
- _log.info("Single ack on delivery tag " + deliveryTag + " not known for channel:" + _channelId);
+ _log.trace("Single ack on delivery tag " + deliveryTag + " not known for channel:" + _channelId);
throw new AMQException("Single ack on delivery tag " + deliveryTag + " not known for channel:" + _channelId);
}
msg.discard();
- if (_log.isDebugEnabled())
+ if (_log.isTraceEnabled())
{
- _log.debug("Received non-multiple ack for messaging with delivery tag " + deliveryTag);
+ _log.trace("Received non-multiple ack for messaging with delivery tag " + deliveryTag);
}
}
@@ -569,13 +592,24 @@ public class AMQChannel
//noinspection SynchronizeOnNonFinalField
synchronized(_unacknowledgedMessageMapLock)
{
- suspend = _unacknowledgedMessageMap.size() >= _prefetchCount;
+ suspend = _unacknowledgedMessageMap.size() >= _prefetch_HighWaterMark;
}
setSuspended(suspend);
}
public void setSuspended(boolean suspended)
{
+ boolean isSuspended = _suspended.get();
+
+ if (isSuspended && !suspended)
+ {
+ synchronized(_unacknowledgedMessageMapLock)
+ {
+ // Continue being suspended if we are above the _prefetch_LowWaterMark
+ suspended = _unacknowledgedMessageMap.size() > _prefetch_LowWaterMark;
+ }
+ }
+
boolean wasSuspended = _suspended.getAndSet(suspended);
if (wasSuspended != suspended)
{
@@ -602,16 +636,16 @@ public class AMQChannel
public void commit() throws AMQException
{
- if(ackOp != null)
+ if (ackOp != null)
{
ackOp.consolidate();
- if(ackOp.checkPersistent())
+ if (ackOp.checkPersistent())
{
- _txnBuffer.containsPersistentChanges();
+ _txnBuffer.containsPersistentChanges();
}
ackOp = null;//already enlisted, after commit will reset regardless of outcome
}
-
+
_txnBuffer.commit();
//TODO: may need to return 'immediate' messages at this point
}
@@ -629,7 +663,8 @@ public class AMQChannel
{
StringBuilder sb = new StringBuilder(30);
sb.append("Channel: id ").append(_channelId).append(", transaction mode: ").append(_transactional);
- sb.append(", prefetch count: ").append(_prefetchCount);
+ sb.append(", prefetch marks: ").append(_prefetch_LowWaterMark);
+ sb.append("/").append(_prefetch_HighWaterMark);
return sb.toString();
}
@@ -654,7 +689,7 @@ public class AMQChannel
public void processReturns(AMQProtocolSession session)
{
- for(AMQDataBlock block : _returns)
+ for (AMQDataBlock block : _returns)
{
session.writeFrame(block);
}
@@ -669,6 +704,7 @@ public class AMQChannel
{
impl().collect(deliveryTag, multiple, msgs);
}
+
public void remove(List<UnacknowledgedMessage> msgs)
{
impl().remove(msgs);
@@ -742,7 +778,7 @@ public class AMQChannel
{
_msg.decrementReference();
}
- catch(AMQException e)
+ catch (AMQException e)
{
_log.error("On commiting transaction, failed to cleanup unused message: " + e, e);
}
@@ -750,7 +786,7 @@ public class AMQChannel
{
_msg.checkDeliveredToConsumer();
}
- catch(NoConsumersException e)
+ catch (NoConsumersException e)
{
//TODO: store this for delivery after the commit-ok
_returns.add(e.getReturnMessage(_channelId));