diff options
| author | Steven Shaw <steshaw@apache.org> | 2006-11-30 18:54:48 +0000 |
|---|---|---|
| committer | Steven Shaw <steshaw@apache.org> | 2006-11-30 18:54:48 +0000 |
| commit | 33c04c7e619a65e2d92ac231805e8ad27f4a29c2 (patch) | |
| tree | 1fdc64001d5e0bf1f34883927d7901b456b7bd3b /dotnet/Qpid.Client/Client | |
| parent | 8f21f5d6cacd35e6fe04a0b4a5567fc4929f997e (diff) | |
| download | qpid-python-33c04c7e619a65e2d92ac231805e8ad27f4a29c2.tar.gz | |
QPID-136 Ported Prefetch with PrefetchHigh and PrefetchLow
QPID-137 Ported AcknowledgeModes
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@481035 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'dotnet/Qpid.Client/Client')
| -rw-r--r-- | dotnet/Qpid.Client/Client/AmqChannel.cs | 35 | ||||
| -rw-r--r-- | dotnet/Qpid.Client/Client/BasicMessageConsumer.cs | 103 |
2 files changed, 47 insertions, 91 deletions
diff --git a/dotnet/Qpid.Client/Client/AmqChannel.cs b/dotnet/Qpid.Client/Client/AmqChannel.cs index 0ab3fd3411..2ffd6505c6 100644 --- a/dotnet/Qpid.Client/Client/AmqChannel.cs +++ b/dotnet/Qpid.Client/Client/AmqChannel.cs @@ -43,7 +43,7 @@ namespace Qpid.Client // Used in the consume method. We generate the consume tag on the client so that we can use the nowait feature. private int _nextConsumerNumber = 1; - internal const int DEFAULT_PREFETCH = 5000; + internal const int DEFAULT_PREFETCH = MessageConsumerBuilder.DEFAULT_PREFETCH_HIGH; private AMQConnection _connection; @@ -273,6 +273,7 @@ namespace Qpid.Client public void Commit() { + // FIXME: Fail over safety. Needs FailoverSupport? CheckNotClosed(); CheckTransacted(); // throws IllegalOperationException if not a transacted session @@ -297,6 +298,7 @@ namespace Qpid.Client public void Rollback() { + // FIXME: Fail over safety. Needs FailoverSupport? CheckNotClosed(); CheckTransacted(); // throws IllegalOperationException if not a transacted session @@ -489,25 +491,26 @@ namespace Qpid.Client } public IMessageConsumer CreateConsumer(string queueName, - int prefetch, + int prefetchLow, + int prefetchHigh, bool noLocal, bool exclusive, bool durable, string subscriptionName) { - _logger.Debug(String.Format("CreateConsumer queueName={0} prefetch={1} noLocal={2} exclusive={3} durable={4} subscriptionName={5}", - queueName, prefetch, noLocal, exclusive, durable, subscriptionName)); - return CreateConsumerImpl(queueName, prefetch, noLocal, exclusive, durable, subscriptionName); + _logger.Debug(String.Format("CreateConsumer queueName={0} prefetchLow={1} prefetchHigh={2} noLocal={3} exclusive={4} durable={5} subscriptionName={6}", + queueName, prefetchLow, prefetchHigh, noLocal, exclusive, durable, subscriptionName)); + return CreateConsumerImpl(queueName, prefetchLow, prefetchHigh, noLocal, exclusive, durable, subscriptionName); } private IMessageConsumer CreateConsumerImpl(string queueName, - int prefetch, - bool noLocal, - bool exclusive, - bool durable, - string subscriptionName) + int prefetchLow, + int prefetchHigh, + bool noLocal, + bool exclusive, + bool durable, + string subscriptionName) { - if (durable || subscriptionName != null) { throw new NotImplementedException(); // TODO: durable subscriptions. @@ -518,7 +521,8 @@ namespace Qpid.Client CheckNotClosed(); BasicMessageConsumer consumer = new BasicMessageConsumer(_channelId, queueName, noLocal, - _messageFactoryRegistry, this); + _messageFactoryRegistry, this, + prefetchHigh, prefetchLow, exclusive); try { RegisterConsumer(consumer); @@ -710,9 +714,8 @@ namespace Qpid.Client /// <param name="consumer"></param> void RegisterConsumer(BasicMessageConsumer consumer) { - String consumerTag = ConsumeFromQueue(consumer.QueueName, consumer.Prefetch, consumer.NoLocal, + String consumerTag = ConsumeFromQueue(consumer.QueueName, consumer.NoLocal, consumer.Exclusive, consumer.AcknowledgeMode); - consumer.ConsumerTag = consumerTag; _consumers.Add(consumerTag, consumer); } @@ -744,8 +747,7 @@ namespace Qpid.Client } } - private String ConsumeFromQueue(String queueName, int prefetch, - bool noLocal, bool exclusive, AcknowledgeMode acknowledgeMode) + private String ConsumeFromQueue(String queueName, bool noLocal, bool exclusive, AcknowledgeMode acknowledgeMode) { // Need to generate a consumer tag on the client so we can exploit the nowait flag. String tag = string.Format("{0}-{1}", _sessionNumber, _nextConsumerNumber++); @@ -973,7 +975,6 @@ namespace Qpid.Client public void AcknowledgeMessage(ulong deliveryTag, bool multiple) { AMQFrame ackFrame = BasicAckBody.CreateAMQFrame(_channelId, deliveryTag, multiple); - _logger.Info("XXX sending ack: " + ackFrame); if (_logger.IsDebugEnabled) { _logger.Debug("Sending ack for delivery tag " + deliveryTag + " on channel " + _channelId); diff --git a/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs b/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs index 611a4e5351..133707c609 100644 --- a/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs +++ b/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs @@ -82,10 +82,15 @@ namespace Qpid.Client /// </summary> private readonly object _syncLock = new object(); - /** - * We store the prefetch field in order to be able to reuse it when resubscribing in the event of failover - */ - private int _prefetch; + /// <summary> + /// We store the high water prefetch field in order to be able to reuse it when resubscribing in the event of failover + /// </summary> + private int _prefetchHigh; + + /// <summary> + /// We store the low water prefetch field in order to be able to reuse it when resubscribing in the event of failover + /// </summary> + private int _prefetchLow; /// <summary> /// When true indicates that either a message listener is set or that @@ -108,8 +113,20 @@ namespace Qpid.Client /// </summary> private long _lastDeliveryTag; - public BasicMessageConsumer(ushort channelId, string queueName, bool noLocal, - MessageFactoryRegistry messageFactory, AmqChannel channel) + /// <summary> + /// Number of messages unacknowledged in DUPS_OK_ACKNOWLEDGE mode + /// </summary> + private int _outstanding; + + /// <summary> + /// Switch to enable sending of acknowledgements when using DUPS_OK_ACKNOWLEDGE mode. + /// Enabled when _outstannding number of msgs >= _prefetchHigh and disabled at < _prefetchLow + /// </summary> + private bool _dups_ok_acknowledge_send; + + internal BasicMessageConsumer(ushort channelId, string queueName, bool noLocal, + MessageFactoryRegistry messageFactory, AmqChannel channel, + int prefetchHigh, int prefetchLow, bool exclusive) { _channelId = channelId; _queueName = queueName; @@ -117,6 +134,9 @@ namespace Qpid.Client _messageFactory = messageFactory; _channel = channel; _acknowledgeMode = _channel.AcknowledgeMode; + _prefetchHigh = prefetchHigh; + _prefetchLow = prefetchLow; + _exclusive = exclusive; } #region IMessageConsumer Members @@ -302,65 +322,6 @@ namespace Qpid.Client } } -// /// <summary> -// /// Called from the AmqChannel when a message has arrived for this consumer. This methods handles both the case -// /// of a message listener or a synchronous receive() caller. -// /// </summary> -// /// <param name="messageFrame">the raw unprocessed mesage</param> -// /// <param name="acknowledgeMode">the acknowledge mode requested for this message</param> -// /// <param name="channelId">channel on which this message was sent</param> -// internal void NotifyMessage(UnprocessedMessage messageFrame, AcknowledgeMode acknowledgeMode, ushort channelId) -// { -// _logger.Info("XXX notifyMessage called with message number " + messageFrame.DeliverBody.DeliveryTag); -// if (_logger.IsDebugEnabled) -// { -// _logger.Debug("notifyMessage called with message number " + messageFrame.DeliverBody.DeliveryTag); -// } -// try -// { -// AbstractQmsMessage jmsMessage = _messageFactory.CreateMessage((long)messageFrame.DeliverBody.DeliveryTag, -// messageFrame.DeliverBody.Redelivered, -// messageFrame.ContentHeader, -// messageFrame.Bodies); - -// /*if (acknowledgeMode == AcknowledgeMode.PreAcknowledge) -// { -// _channel.sendAcknowledgement(messageFrame.deliverBody.deliveryTag); -// }*/ -// if (acknowledgeMode == AcknowledgeMode.ClientAcknowledge) -// { -// // we set the session so that when the user calls acknowledge() it can call the method on session -// // to send out the appropriate frame -// jmsMessage.Channel = _channel; -// } - -// lock (_syncLock) -// { -// if (_messageListener != null) -// { -//#if __MonoCS__ -// _messageListener(jmsMessage); -//#else -// _messageListener.Invoke(jmsMessage); -//#endif -// } -// else -// { -// _synchronousQueue.Enqueue(jmsMessage); -// } -// } -// if (acknowledgeMode == AcknowledgeMode.AutoAcknowledge) -// { -// _channel.SendAcknowledgement(messageFrame.DeliverBody.DeliveryTag); -// } -// } -// catch (Exception e) -// { -// _logger.Error("Caught exception (dump follows) - ignoring...", e); -// } -// } - - /** * Called from the AMQSession when a message has arrived for this consumer. This methods handles both the case * of a message listener or a synchronous receive() caller. @@ -465,11 +426,6 @@ namespace Qpid.Client DeregisterConsumer(); } - public int Prefetch - { - get { return _prefetch; } - } - public string QueueName { get { return _queueName; } @@ -509,7 +465,6 @@ namespace Qpid.Client AbstractQmsMessage msg = (AbstractQmsMessage) m; switch (AcknowledgeMode) { -/* TODO case AcknowledgeMode.DupsOkAcknowledge: if (++_outstanding >= _prefetchHigh) { @@ -519,16 +474,16 @@ namespace Qpid.Client { _dups_ok_acknowledge_send = false; } - if (_dups_ok_acknowledge_send) { - _channel.AcknowledgeMessage(msg.getDeliveryTag(), true); + _channel.AcknowledgeMessage((ulong)msg.DeliveryTag, true); } break; - */ + case AcknowledgeMode.AutoAcknowledge: _channel.AcknowledgeMessage((ulong)msg.DeliveryTag, true); break; + case AcknowledgeMode.SessionTransacted: _lastDeliveryTag = msg.DeliveryTag; break; |
