summaryrefslogtreecommitdiff
path: root/dotnet/Qpid.Client/Client
diff options
context:
space:
mode:
authorRobert Greig <rgreig@apache.org>2007-02-27 15:45:33 +0000
committerRobert Greig <rgreig@apache.org>2007-02-27 15:45:33 +0000
commita20abbb7c6c6ac4f7105cb57544375e7058f3904 (patch)
tree85a2c0a4ca94365e99c5e307f147012f2761048e /dotnet/Qpid.Client/Client
parent3bfaba7fd65f251b68e8c4085582a4b62edf8e5d (diff)
downloadqpid-python-a20abbb7c6c6ac4f7105cb57544375e7058f3904.tar.gz
(Patch submitted by Tomas Restrepo) QPID-354.
With the patch, blocking receive calls with and without timeouts work. Added new unit test class to support functionality added. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@512288 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'dotnet/Qpid.Client/Client')
-rw-r--r--dotnet/Qpid.Client/Client/BasicMessageConsumer.cs59
1 files changed, 9 insertions, 50 deletions
diff --git a/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs b/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs
index 133707c609..796a878eec 100644
--- a/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs
+++ b/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs
@@ -100,9 +100,9 @@ namespace Qpid.Client
/// <summary>
/// Used in the blocking receive methods to receive a message from
- /// the Channel thread. Argument true indicates we want strict FIFO semantics
+ /// the Channel thread.
/// </summary>
- private readonly SynchronousQueue _synchronousQueue = new SynchronousQueue(true);
+ private readonly ConsumerProducerQueue _messageQueue = new ConsumerProducerQueue();
private MessageFactoryRegistry _messageFactory;
@@ -188,17 +188,8 @@ namespace Qpid.Client
try
{
- object o;
- if (delay > 0)
- {
- //o = _synchronousQueue.Poll(l, TimeUnit.MILLISECONDS);
- throw new NotImplementedException("Need to implement synchronousQueue.Poll(timeout");
- }
- else
- {
- o = _synchronousQueue.DequeueBlocking();
- }
-
+ object o = _messageQueue.Dequeue(delay);
+
return ReturnMessageOrThrowAndPostDeliver(o);
}
finally
@@ -222,42 +213,12 @@ namespace Qpid.Client
public IMessage Receive()
{
- return Receive(0);
+ return Receive(Timeout.Infinite);
}
public IMessage ReceiveNoWait()
{
- CheckNotClosed();
-
- lock (_syncLock)
- {
- // If someone is already receiving
- if (_receiving)
- {
- throw new InvalidOperationException("Another thread is already receiving (possibly asynchronously)...");
- }
-
- _receiving = true;
- }
-
- try
- {
- if (_synchronousQueue.Count > 0)
- {
- return ReturnMessageOrThrowAndPostDeliver(_synchronousQueue.Dequeue());
- }
- else
- {
- return null;
- }
- }
- finally
- {
- lock (_syncLock)
- {
- _receiving = false;
- }
- }
+ return Receive(0);
}
#endregion
@@ -359,7 +320,7 @@ namespace Qpid.Client
}
else
{
- _synchronousQueue.Enqueue(jmsMessage);
+ _messageQueue.Enqueue(jmsMessage);
}
}
catch (Exception e)
@@ -380,10 +341,8 @@ namespace Qpid.Client
if (_messageListener == null)
{
// offer only succeeds if there is a thread waiting for an item from the queue
- if (_synchronousQueue.EnqueueNoThrow(cause))
- {
- _logger.Debug("Passed exception to synchronous queue for propagation to receive()");
- }
+ _messageQueue.Enqueue(cause);
+ _logger.Debug("Passed exception to synchronous queue for propagation to receive()");
}
DeregisterConsumer();
}