summaryrefslogtreecommitdiff
path: root/dotnet/Qpid.Client/Client
diff options
context:
space:
mode:
authorSteven Shaw <steshaw@apache.org>2006-11-30 18:54:48 +0000
committerSteven Shaw <steshaw@apache.org>2006-11-30 18:54:48 +0000
commit33c04c7e619a65e2d92ac231805e8ad27f4a29c2 (patch)
tree1fdc64001d5e0bf1f34883927d7901b456b7bd3b /dotnet/Qpid.Client/Client
parent8f21f5d6cacd35e6fe04a0b4a5567fc4929f997e (diff)
downloadqpid-python-33c04c7e619a65e2d92ac231805e8ad27f4a29c2.tar.gz
QPID-136 Ported Prefetch with PrefetchHigh and PrefetchLow
QPID-137 Ported AcknowledgeModes git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@481035 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'dotnet/Qpid.Client/Client')
-rw-r--r--dotnet/Qpid.Client/Client/AmqChannel.cs35
-rw-r--r--dotnet/Qpid.Client/Client/BasicMessageConsumer.cs103
2 files changed, 47 insertions, 91 deletions
diff --git a/dotnet/Qpid.Client/Client/AmqChannel.cs b/dotnet/Qpid.Client/Client/AmqChannel.cs
index 0ab3fd3411..2ffd6505c6 100644
--- a/dotnet/Qpid.Client/Client/AmqChannel.cs
+++ b/dotnet/Qpid.Client/Client/AmqChannel.cs
@@ -43,7 +43,7 @@ namespace Qpid.Client
// Used in the consume method. We generate the consume tag on the client so that we can use the nowait feature.
private int _nextConsumerNumber = 1;
- internal const int DEFAULT_PREFETCH = 5000;
+ internal const int DEFAULT_PREFETCH = MessageConsumerBuilder.DEFAULT_PREFETCH_HIGH;
private AMQConnection _connection;
@@ -273,6 +273,7 @@ namespace Qpid.Client
public void Commit()
{
+ // FIXME: Fail over safety. Needs FailoverSupport?
CheckNotClosed();
CheckTransacted(); // throws IllegalOperationException if not a transacted session
@@ -297,6 +298,7 @@ namespace Qpid.Client
public void Rollback()
{
+ // FIXME: Fail over safety. Needs FailoverSupport?
CheckNotClosed();
CheckTransacted(); // throws IllegalOperationException if not a transacted session
@@ -489,25 +491,26 @@ namespace Qpid.Client
}
public IMessageConsumer CreateConsumer(string queueName,
- int prefetch,
+ int prefetchLow,
+ int prefetchHigh,
bool noLocal,
bool exclusive,
bool durable,
string subscriptionName)
{
- _logger.Debug(String.Format("CreateConsumer queueName={0} prefetch={1} noLocal={2} exclusive={3} durable={4} subscriptionName={5}",
- queueName, prefetch, noLocal, exclusive, durable, subscriptionName));
- return CreateConsumerImpl(queueName, prefetch, noLocal, exclusive, durable, subscriptionName);
+ _logger.Debug(String.Format("CreateConsumer queueName={0} prefetchLow={1} prefetchHigh={2} noLocal={3} exclusive={4} durable={5} subscriptionName={6}",
+ queueName, prefetchLow, prefetchHigh, noLocal, exclusive, durable, subscriptionName));
+ return CreateConsumerImpl(queueName, prefetchLow, prefetchHigh, noLocal, exclusive, durable, subscriptionName);
}
private IMessageConsumer CreateConsumerImpl(string queueName,
- int prefetch,
- bool noLocal,
- bool exclusive,
- bool durable,
- string subscriptionName)
+ int prefetchLow,
+ int prefetchHigh,
+ bool noLocal,
+ bool exclusive,
+ bool durable,
+ string subscriptionName)
{
-
if (durable || subscriptionName != null)
{
throw new NotImplementedException(); // TODO: durable subscriptions.
@@ -518,7 +521,8 @@ namespace Qpid.Client
CheckNotClosed();
BasicMessageConsumer consumer = new BasicMessageConsumer(_channelId, queueName, noLocal,
- _messageFactoryRegistry, this);
+ _messageFactoryRegistry, this,
+ prefetchHigh, prefetchLow, exclusive);
try
{
RegisterConsumer(consumer);
@@ -710,9 +714,8 @@ namespace Qpid.Client
/// <param name="consumer"></param>
void RegisterConsumer(BasicMessageConsumer consumer)
{
- String consumerTag = ConsumeFromQueue(consumer.QueueName, consumer.Prefetch, consumer.NoLocal,
+ String consumerTag = ConsumeFromQueue(consumer.QueueName, consumer.NoLocal,
consumer.Exclusive, consumer.AcknowledgeMode);
-
consumer.ConsumerTag = consumerTag;
_consumers.Add(consumerTag, consumer);
}
@@ -744,8 +747,7 @@ namespace Qpid.Client
}
}
- private String ConsumeFromQueue(String queueName, int prefetch,
- bool noLocal, bool exclusive, AcknowledgeMode acknowledgeMode)
+ private String ConsumeFromQueue(String queueName, bool noLocal, bool exclusive, AcknowledgeMode acknowledgeMode)
{
// Need to generate a consumer tag on the client so we can exploit the nowait flag.
String tag = string.Format("{0}-{1}", _sessionNumber, _nextConsumerNumber++);
@@ -973,7 +975,6 @@ namespace Qpid.Client
public void AcknowledgeMessage(ulong deliveryTag, bool multiple)
{
AMQFrame ackFrame = BasicAckBody.CreateAMQFrame(_channelId, deliveryTag, multiple);
- _logger.Info("XXX sending ack: " + ackFrame);
if (_logger.IsDebugEnabled)
{
_logger.Debug("Sending ack for delivery tag " + deliveryTag + " on channel " + _channelId);
diff --git a/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs b/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs
index 611a4e5351..133707c609 100644
--- a/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs
+++ b/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs
@@ -82,10 +82,15 @@ namespace Qpid.Client
/// </summary>
private readonly object _syncLock = new object();
- /**
- * We store the prefetch field in order to be able to reuse it when resubscribing in the event of failover
- */
- private int _prefetch;
+ /// <summary>
+ /// We store the high water prefetch field in order to be able to reuse it when resubscribing in the event of failover
+ /// </summary>
+ private int _prefetchHigh;
+
+ /// <summary>
+ /// We store the low water prefetch field in order to be able to reuse it when resubscribing in the event of failover
+ /// </summary>
+ private int _prefetchLow;
/// <summary>
/// When true indicates that either a message listener is set or that
@@ -108,8 +113,20 @@ namespace Qpid.Client
/// </summary>
private long _lastDeliveryTag;
- public BasicMessageConsumer(ushort channelId, string queueName, bool noLocal,
- MessageFactoryRegistry messageFactory, AmqChannel channel)
+ /// <summary>
+ /// Number of messages unacknowledged in DUPS_OK_ACKNOWLEDGE mode
+ /// </summary>
+ private int _outstanding;
+
+ /// <summary>
+ /// Switch to enable sending of acknowledgements when using DUPS_OK_ACKNOWLEDGE mode.
+ /// Enabled when _outstannding number of msgs >= _prefetchHigh and disabled at < _prefetchLow
+ /// </summary>
+ private bool _dups_ok_acknowledge_send;
+
+ internal BasicMessageConsumer(ushort channelId, string queueName, bool noLocal,
+ MessageFactoryRegistry messageFactory, AmqChannel channel,
+ int prefetchHigh, int prefetchLow, bool exclusive)
{
_channelId = channelId;
_queueName = queueName;
@@ -117,6 +134,9 @@ namespace Qpid.Client
_messageFactory = messageFactory;
_channel = channel;
_acknowledgeMode = _channel.AcknowledgeMode;
+ _prefetchHigh = prefetchHigh;
+ _prefetchLow = prefetchLow;
+ _exclusive = exclusive;
}
#region IMessageConsumer Members
@@ -302,65 +322,6 @@ namespace Qpid.Client
}
}
-// /// <summary>
-// /// Called from the AmqChannel when a message has arrived for this consumer. This methods handles both the case
-// /// of a message listener or a synchronous receive() caller.
-// /// </summary>
-// /// <param name="messageFrame">the raw unprocessed mesage</param>
-// /// <param name="acknowledgeMode">the acknowledge mode requested for this message</param>
-// /// <param name="channelId">channel on which this message was sent</param>
-// internal void NotifyMessage(UnprocessedMessage messageFrame, AcknowledgeMode acknowledgeMode, ushort channelId)
-// {
-// _logger.Info("XXX notifyMessage called with message number " + messageFrame.DeliverBody.DeliveryTag);
-// if (_logger.IsDebugEnabled)
-// {
-// _logger.Debug("notifyMessage called with message number " + messageFrame.DeliverBody.DeliveryTag);
-// }
-// try
-// {
-// AbstractQmsMessage jmsMessage = _messageFactory.CreateMessage((long)messageFrame.DeliverBody.DeliveryTag,
-// messageFrame.DeliverBody.Redelivered,
-// messageFrame.ContentHeader,
-// messageFrame.Bodies);
-
-// /*if (acknowledgeMode == AcknowledgeMode.PreAcknowledge)
-// {
-// _channel.sendAcknowledgement(messageFrame.deliverBody.deliveryTag);
-// }*/
-// if (acknowledgeMode == AcknowledgeMode.ClientAcknowledge)
-// {
-// // we set the session so that when the user calls acknowledge() it can call the method on session
-// // to send out the appropriate frame
-// jmsMessage.Channel = _channel;
-// }
-
-// lock (_syncLock)
-// {
-// if (_messageListener != null)
-// {
-//#if __MonoCS__
-// _messageListener(jmsMessage);
-//#else
-// _messageListener.Invoke(jmsMessage);
-//#endif
-// }
-// else
-// {
-// _synchronousQueue.Enqueue(jmsMessage);
-// }
-// }
-// if (acknowledgeMode == AcknowledgeMode.AutoAcknowledge)
-// {
-// _channel.SendAcknowledgement(messageFrame.DeliverBody.DeliveryTag);
-// }
-// }
-// catch (Exception e)
-// {
-// _logger.Error("Caught exception (dump follows) - ignoring...", e);
-// }
-// }
-
-
/**
* Called from the AMQSession when a message has arrived for this consumer. This methods handles both the case
* of a message listener or a synchronous receive() caller.
@@ -465,11 +426,6 @@ namespace Qpid.Client
DeregisterConsumer();
}
- public int Prefetch
- {
- get { return _prefetch; }
- }
-
public string QueueName
{
get { return _queueName; }
@@ -509,7 +465,6 @@ namespace Qpid.Client
AbstractQmsMessage msg = (AbstractQmsMessage) m;
switch (AcknowledgeMode)
{
-/* TODO
case AcknowledgeMode.DupsOkAcknowledge:
if (++_outstanding >= _prefetchHigh)
{
@@ -519,16 +474,16 @@ namespace Qpid.Client
{
_dups_ok_acknowledge_send = false;
}
-
if (_dups_ok_acknowledge_send)
{
- _channel.AcknowledgeMessage(msg.getDeliveryTag(), true);
+ _channel.AcknowledgeMessage((ulong)msg.DeliveryTag, true);
}
break;
- */
+
case AcknowledgeMode.AutoAcknowledge:
_channel.AcknowledgeMessage((ulong)msg.DeliveryTag, true);
break;
+
case AcknowledgeMode.SessionTransacted:
_lastDeliveryTag = msg.DeliveryTag;
break;