summaryrefslogtreecommitdiff
path: root/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs
diff options
context:
space:
mode:
authorSteven Shaw <steshaw@apache.org>2006-11-29 05:51:43 +0000
committerSteven Shaw <steshaw@apache.org>2006-11-29 05:51:43 +0000
commit1df5c37bfa8fe1ec20e41dfd2bb06bc10a790917 (patch)
tree8012603b18bae10e2c781debf36a5cd9b7e89bb6 /dotnet/Qpid.Client/Client/BasicMessageConsumer.cs
parent74abf7b06d6d3d4d291f87d750500ceded74dd1b (diff)
downloadqpid-python-1df5c37bfa8fe1ec20e41dfd2bb06bc10a790917.tar.gz
QPID-137. First stab at porting enough to get AutoAcknowledge mode working.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@480423 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'dotnet/Qpid.Client/Client/BasicMessageConsumer.cs')
-rw-r--r--dotnet/Qpid.Client/Client/BasicMessageConsumer.cs170
1 files changed, 125 insertions, 45 deletions
diff --git a/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs b/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs
index 1c9a009174..ffd19e9500 100644
--- a/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs
+++ b/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs
@@ -50,15 +50,18 @@ namespace Qpid.Client
set { _noLocal = value; }
}
- AcknowledgeMode _acknowledgeMode = AcknowledgeMode.NoAcknowledge;
-
public AcknowledgeMode AcknowledgeMode
{
- get { return _acknowledgeMode; }
+ get { return _channel.AcknowledgeMode; }
}
private MessageReceivedDelegate _messageListener;
+ private bool IsMessageListenerSet
+ {
+ get { return _messageListener != null; }
+ }
+
/// <summary>
/// The consumer tag allows us to close the consumer by sending a jmsCancel method to the
/// broker
@@ -173,12 +176,7 @@ namespace Qpid.Client
o = _synchronousQueue.DequeueBlocking();
}
- IMessage m = ReturnMessageOrThrow(o);
- if (m != null)
- {
- PostDeliver(m);
- }
- return m;
+ return ReturnMessageOrThrowAndPostDeliver(o);
}
finally
{
@@ -189,6 +187,16 @@ namespace Qpid.Client
}
}
+ private IMessage ReturnMessageOrThrowAndPostDeliver(object o)
+ {
+ IMessage m = ReturnMessageOrThrow(o);
+ if (m != null)
+ {
+ PostDeliver(m);
+ }
+ return m;
+ }
+
public IMessage Receive()
{
return Receive(0);
@@ -211,8 +219,14 @@ namespace Qpid.Client
try
{
- object o = _synchronousQueue.Dequeue();
- return ReturnMessageOrThrow(o);
+ if (_synchronousQueue.Count > 0)
+ {
+ return ReturnMessageOrThrowAndPostDeliver(_synchronousQueue.Dequeue());
+ }
+ else
+ {
+ return null;
+ }
}
finally
{
@@ -285,14 +299,73 @@ namespace Qpid.Client
}
}
- /// <summary>
- /// Called from the AmqChannel when a message has arrived for this consumer. This methods handles both the case
- /// of a message listener or a synchronous receive() caller.
- /// </summary>
- /// <param name="messageFrame">the raw unprocessed mesage</param>
- /// <param name="acknowledgeMode">the acknowledge mode requested for this message</param>
- /// <param name="channelId">channel on which this message was sent</param>
- internal void NotifyMessage(UnprocessedMessage messageFrame, AcknowledgeMode acknowledgeMode, ushort channelId)
+// /// <summary>
+// /// Called from the AmqChannel when a message has arrived for this consumer. This methods handles both the case
+// /// of a message listener or a synchronous receive() caller.
+// /// </summary>
+// /// <param name="messageFrame">the raw unprocessed mesage</param>
+// /// <param name="acknowledgeMode">the acknowledge mode requested for this message</param>
+// /// <param name="channelId">channel on which this message was sent</param>
+// internal void NotifyMessage(UnprocessedMessage messageFrame, AcknowledgeMode acknowledgeMode, ushort channelId)
+// {
+// _logger.Info("XXX notifyMessage called with message number " + messageFrame.DeliverBody.DeliveryTag);
+// if (_logger.IsDebugEnabled)
+// {
+// _logger.Debug("notifyMessage called with message number " + messageFrame.DeliverBody.DeliveryTag);
+// }
+// try
+// {
+// AbstractQmsMessage jmsMessage = _messageFactory.CreateMessage((long)messageFrame.DeliverBody.DeliveryTag,
+// messageFrame.DeliverBody.Redelivered,
+// messageFrame.ContentHeader,
+// messageFrame.Bodies);
+
+// /*if (acknowledgeMode == AcknowledgeMode.PreAcknowledge)
+// {
+// _channel.sendAcknowledgement(messageFrame.deliverBody.deliveryTag);
+// }*/
+// if (acknowledgeMode == AcknowledgeMode.ClientAcknowledge)
+// {
+// // we set the session so that when the user calls acknowledge() it can call the method on session
+// // to send out the appropriate frame
+// jmsMessage.Channel = _channel;
+// }
+
+// lock (_syncLock)
+// {
+// if (_messageListener != null)
+// {
+//#if __MonoCS__
+// _messageListener(jmsMessage);
+//#else
+// _messageListener.Invoke(jmsMessage);
+//#endif
+// }
+// else
+// {
+// _synchronousQueue.Enqueue(jmsMessage);
+// }
+// }
+// if (acknowledgeMode == AcknowledgeMode.AutoAcknowledge)
+// {
+// _channel.SendAcknowledgement(messageFrame.DeliverBody.DeliveryTag);
+// }
+// }
+// catch (Exception e)
+// {
+// _logger.Error("Caught exception (dump follows) - ignoring...", e);
+// }
+// }
+
+
+ /**
+ * Called from the AMQSession when a message has arrived for this consumer. This methods handles both the case
+ * of a message listener or a synchronous receive() caller.
+ *
+ * @param messageFrame the raw unprocessed mesage
+ * @param channelId channel on which this message was sent
+ */
+ internal void NotifyMessage(UnprocessedMessage messageFrame, int channelId)
{
if (_logger.IsDebugEnabled)
{
@@ -300,48 +373,38 @@ namespace Qpid.Client
}
try
{
- AbstractQmsMessage jmsMessage = _messageFactory.CreateMessage(messageFrame.DeliverBody.DeliveryTag,
+ AbstractQmsMessage jmsMessage = _messageFactory.CreateMessage((long)messageFrame.DeliverBody.DeliveryTag,
messageFrame.DeliverBody.Redelivered,
messageFrame.ContentHeader,
messageFrame.Bodies);
- /*if (acknowledgeMode == AcknowledgeMode.PreAcknowledge)
- {
- _channel.sendAcknowledgement(messageFrame.deliverBody.deliveryTag);
- }*/
- if (acknowledgeMode == AcknowledgeMode.ClientAcknowledge)
- {
- // we set the session so that when the user calls acknowledge() it can call the method on session
- // to send out the appropriate frame
- jmsMessage.Channel = _channel;
- }
+ _logger.Debug("Message is of type: " + jmsMessage.GetType().Name);
- lock (_syncLock)
+ PreDeliver(jmsMessage);
+
+ if (IsMessageListenerSet)
{
- if (_messageListener != null)
- {
+ // We do not need a lock around the test above, and the dispatch below as it is invalid
+ // for an application to alter an installed listener while the session is started.
#if __MonoCS__
_messageListener(jmsMessage);
#else
- _messageListener.Invoke(jmsMessage);
+ _messageListener.Invoke(jmsMessage);
#endif
- }
- else
- {
- _synchronousQueue.Enqueue(jmsMessage);
- }
+ PostDeliver(jmsMessage);
}
- if (acknowledgeMode == AcknowledgeMode.AutoAcknowledge)
+ else
{
- _channel.SendAcknowledgement(messageFrame.DeliverBody.DeliveryTag);
+ _synchronousQueue.Enqueue(jmsMessage);
}
}
catch (Exception e)
{
- _logger.Error("Caught exception (dump follows) - ignoring...", e);
+ _logger.Error("Caught exception (dump follows) - ignoring...", e); // FIXME
}
}
+
internal void NotifyError(Exception cause)
{
lock (_syncLock)
@@ -416,15 +479,32 @@ namespace Qpid.Client
{
if (_lastDeliveryTag > 0)
{
- _channel.AcknowledgeMessage(_lastDeliveryTag, true);
+ _channel.AcknowledgeMessage((ulong)_lastDeliveryTag, true); // XXX evil cast
_lastDeliveryTag = -1;
}
}
+ private void PreDeliver(AbstractQmsMessage msg)
+ {
+ switch (AcknowledgeMode)
+ {
+ case AcknowledgeMode.PreAcknowledge:
+ _channel.AcknowledgeMessage((ulong)msg.DeliveryTag, false);
+ break;
+
+ case AcknowledgeMode.ClientAcknowledge:
+ // We set the session so that when the user calls acknowledge() it can call the method on session
+ // to send out the appropriate frame.
+ //msg.setAMQSession(_session);
+ msg.Channel = _channel;
+ break;
+ }
+ }
+
private void PostDeliver(IMessage m)
{
AbstractQmsMessage msg = (AbstractQmsMessage) m;
- switch (_acknowledgeMode)
+ switch (AcknowledgeMode)
{
/* TODO
case AcknowledgeMode.DupsOkAcknowledge:
@@ -444,7 +524,7 @@ namespace Qpid.Client
break;
*/
case AcknowledgeMode.AutoAcknowledge:
- _channel.AcknowledgeMessage(msg.DeliveryTag, false);
+ _channel.AcknowledgeMessage((ulong)msg.DeliveryTag, true);
break;
case AcknowledgeMode.SessionTransacted:
_lastDeliveryTag = msg.DeliveryTag;