diff options
author | Steven Shaw <steshaw@apache.org> | 2006-11-29 05:51:43 +0000 |
---|---|---|
committer | Steven Shaw <steshaw@apache.org> | 2006-11-29 05:51:43 +0000 |
commit | 1df5c37bfa8fe1ec20e41dfd2bb06bc10a790917 (patch) | |
tree | 8012603b18bae10e2c781debf36a5cd9b7e89bb6 /dotnet/Qpid.Client/Client/BasicMessageConsumer.cs | |
parent | 74abf7b06d6d3d4d291f87d750500ceded74dd1b (diff) | |
download | qpid-python-1df5c37bfa8fe1ec20e41dfd2bb06bc10a790917.tar.gz |
QPID-137. First stab at porting enough to get AutoAcknowledge mode working.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@480423 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'dotnet/Qpid.Client/Client/BasicMessageConsumer.cs')
-rw-r--r-- | dotnet/Qpid.Client/Client/BasicMessageConsumer.cs | 170 |
1 files changed, 125 insertions, 45 deletions
diff --git a/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs b/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs index 1c9a009174..ffd19e9500 100644 --- a/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs +++ b/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs @@ -50,15 +50,18 @@ namespace Qpid.Client set { _noLocal = value; } } - AcknowledgeMode _acknowledgeMode = AcknowledgeMode.NoAcknowledge; - public AcknowledgeMode AcknowledgeMode { - get { return _acknowledgeMode; } + get { return _channel.AcknowledgeMode; } } private MessageReceivedDelegate _messageListener; + private bool IsMessageListenerSet + { + get { return _messageListener != null; } + } + /// <summary> /// The consumer tag allows us to close the consumer by sending a jmsCancel method to the /// broker @@ -173,12 +176,7 @@ namespace Qpid.Client o = _synchronousQueue.DequeueBlocking(); } - IMessage m = ReturnMessageOrThrow(o); - if (m != null) - { - PostDeliver(m); - } - return m; + return ReturnMessageOrThrowAndPostDeliver(o); } finally { @@ -189,6 +187,16 @@ namespace Qpid.Client } } + private IMessage ReturnMessageOrThrowAndPostDeliver(object o) + { + IMessage m = ReturnMessageOrThrow(o); + if (m != null) + { + PostDeliver(m); + } + return m; + } + public IMessage Receive() { return Receive(0); @@ -211,8 +219,14 @@ namespace Qpid.Client try { - object o = _synchronousQueue.Dequeue(); - return ReturnMessageOrThrow(o); + if (_synchronousQueue.Count > 0) + { + return ReturnMessageOrThrowAndPostDeliver(_synchronousQueue.Dequeue()); + } + else + { + return null; + } } finally { @@ -285,14 +299,73 @@ namespace Qpid.Client } } - /// <summary> - /// Called from the AmqChannel when a message has arrived for this consumer. This methods handles both the case - /// of a message listener or a synchronous receive() caller. - /// </summary> - /// <param name="messageFrame">the raw unprocessed mesage</param> - /// <param name="acknowledgeMode">the acknowledge mode requested for this message</param> - /// <param name="channelId">channel on which this message was sent</param> - internal void NotifyMessage(UnprocessedMessage messageFrame, AcknowledgeMode acknowledgeMode, ushort channelId) +// /// <summary> +// /// Called from the AmqChannel when a message has arrived for this consumer. This methods handles both the case +// /// of a message listener or a synchronous receive() caller. +// /// </summary> +// /// <param name="messageFrame">the raw unprocessed mesage</param> +// /// <param name="acknowledgeMode">the acknowledge mode requested for this message</param> +// /// <param name="channelId">channel on which this message was sent</param> +// internal void NotifyMessage(UnprocessedMessage messageFrame, AcknowledgeMode acknowledgeMode, ushort channelId) +// { +// _logger.Info("XXX notifyMessage called with message number " + messageFrame.DeliverBody.DeliveryTag); +// if (_logger.IsDebugEnabled) +// { +// _logger.Debug("notifyMessage called with message number " + messageFrame.DeliverBody.DeliveryTag); +// } +// try +// { +// AbstractQmsMessage jmsMessage = _messageFactory.CreateMessage((long)messageFrame.DeliverBody.DeliveryTag, +// messageFrame.DeliverBody.Redelivered, +// messageFrame.ContentHeader, +// messageFrame.Bodies); + +// /*if (acknowledgeMode == AcknowledgeMode.PreAcknowledge) +// { +// _channel.sendAcknowledgement(messageFrame.deliverBody.deliveryTag); +// }*/ +// if (acknowledgeMode == AcknowledgeMode.ClientAcknowledge) +// { +// // we set the session so that when the user calls acknowledge() it can call the method on session +// // to send out the appropriate frame +// jmsMessage.Channel = _channel; +// } + +// lock (_syncLock) +// { +// if (_messageListener != null) +// { +//#if __MonoCS__ +// _messageListener(jmsMessage); +//#else +// _messageListener.Invoke(jmsMessage); +//#endif +// } +// else +// { +// _synchronousQueue.Enqueue(jmsMessage); +// } +// } +// if (acknowledgeMode == AcknowledgeMode.AutoAcknowledge) +// { +// _channel.SendAcknowledgement(messageFrame.DeliverBody.DeliveryTag); +// } +// } +// catch (Exception e) +// { +// _logger.Error("Caught exception (dump follows) - ignoring...", e); +// } +// } + + + /** + * Called from the AMQSession when a message has arrived for this consumer. This methods handles both the case + * of a message listener or a synchronous receive() caller. + * + * @param messageFrame the raw unprocessed mesage + * @param channelId channel on which this message was sent + */ + internal void NotifyMessage(UnprocessedMessage messageFrame, int channelId) { if (_logger.IsDebugEnabled) { @@ -300,48 +373,38 @@ namespace Qpid.Client } try { - AbstractQmsMessage jmsMessage = _messageFactory.CreateMessage(messageFrame.DeliverBody.DeliveryTag, + AbstractQmsMessage jmsMessage = _messageFactory.CreateMessage((long)messageFrame.DeliverBody.DeliveryTag, messageFrame.DeliverBody.Redelivered, messageFrame.ContentHeader, messageFrame.Bodies); - /*if (acknowledgeMode == AcknowledgeMode.PreAcknowledge) - { - _channel.sendAcknowledgement(messageFrame.deliverBody.deliveryTag); - }*/ - if (acknowledgeMode == AcknowledgeMode.ClientAcknowledge) - { - // we set the session so that when the user calls acknowledge() it can call the method on session - // to send out the appropriate frame - jmsMessage.Channel = _channel; - } + _logger.Debug("Message is of type: " + jmsMessage.GetType().Name); - lock (_syncLock) + PreDeliver(jmsMessage); + + if (IsMessageListenerSet) { - if (_messageListener != null) - { + // We do not need a lock around the test above, and the dispatch below as it is invalid + // for an application to alter an installed listener while the session is started. #if __MonoCS__ _messageListener(jmsMessage); #else - _messageListener.Invoke(jmsMessage); + _messageListener.Invoke(jmsMessage); #endif - } - else - { - _synchronousQueue.Enqueue(jmsMessage); - } + PostDeliver(jmsMessage); } - if (acknowledgeMode == AcknowledgeMode.AutoAcknowledge) + else { - _channel.SendAcknowledgement(messageFrame.DeliverBody.DeliveryTag); + _synchronousQueue.Enqueue(jmsMessage); } } catch (Exception e) { - _logger.Error("Caught exception (dump follows) - ignoring...", e); + _logger.Error("Caught exception (dump follows) - ignoring...", e); // FIXME } } + internal void NotifyError(Exception cause) { lock (_syncLock) @@ -416,15 +479,32 @@ namespace Qpid.Client { if (_lastDeliveryTag > 0) { - _channel.AcknowledgeMessage(_lastDeliveryTag, true); + _channel.AcknowledgeMessage((ulong)_lastDeliveryTag, true); // XXX evil cast _lastDeliveryTag = -1; } } + private void PreDeliver(AbstractQmsMessage msg) + { + switch (AcknowledgeMode) + { + case AcknowledgeMode.PreAcknowledge: + _channel.AcknowledgeMessage((ulong)msg.DeliveryTag, false); + break; + + case AcknowledgeMode.ClientAcknowledge: + // We set the session so that when the user calls acknowledge() it can call the method on session + // to send out the appropriate frame. + //msg.setAMQSession(_session); + msg.Channel = _channel; + break; + } + } + private void PostDeliver(IMessage m) { AbstractQmsMessage msg = (AbstractQmsMessage) m; - switch (_acknowledgeMode) + switch (AcknowledgeMode) { /* TODO case AcknowledgeMode.DupsOkAcknowledge: @@ -444,7 +524,7 @@ namespace Qpid.Client break; */ case AcknowledgeMode.AutoAcknowledge: - _channel.AcknowledgeMessage(msg.DeliveryTag, false); + _channel.AcknowledgeMessage((ulong)msg.DeliveryTag, true); break; case AcknowledgeMode.SessionTransacted: _lastDeliveryTag = msg.DeliveryTag; |