diff options
| author | Robert Greig <rgreig@apache.org> | 2007-02-27 15:45:33 +0000 |
|---|---|---|
| committer | Robert Greig <rgreig@apache.org> | 2007-02-27 15:45:33 +0000 |
| commit | a20abbb7c6c6ac4f7105cb57544375e7058f3904 (patch) | |
| tree | 85a2c0a4ca94365e99c5e307f147012f2761048e /dotnet/Qpid.Client/Client | |
| parent | 3bfaba7fd65f251b68e8c4085582a4b62edf8e5d (diff) | |
| download | qpid-python-a20abbb7c6c6ac4f7105cb57544375e7058f3904.tar.gz | |
(Patch submitted by Tomas Restrepo) QPID-354.
With the patch, blocking receive calls with and without timeouts work. Added new unit test class to support functionality added.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@512288 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'dotnet/Qpid.Client/Client')
| -rw-r--r-- | dotnet/Qpid.Client/Client/BasicMessageConsumer.cs | 59 |
1 files changed, 9 insertions, 50 deletions
diff --git a/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs b/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs index 133707c609..796a878eec 100644 --- a/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs +++ b/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs @@ -100,9 +100,9 @@ namespace Qpid.Client /// <summary> /// Used in the blocking receive methods to receive a message from - /// the Channel thread. Argument true indicates we want strict FIFO semantics + /// the Channel thread. /// </summary> - private readonly SynchronousQueue _synchronousQueue = new SynchronousQueue(true); + private readonly ConsumerProducerQueue _messageQueue = new ConsumerProducerQueue(); private MessageFactoryRegistry _messageFactory; @@ -188,17 +188,8 @@ namespace Qpid.Client try { - object o; - if (delay > 0) - { - //o = _synchronousQueue.Poll(l, TimeUnit.MILLISECONDS); - throw new NotImplementedException("Need to implement synchronousQueue.Poll(timeout"); - } - else - { - o = _synchronousQueue.DequeueBlocking(); - } - + object o = _messageQueue.Dequeue(delay); + return ReturnMessageOrThrowAndPostDeliver(o); } finally @@ -222,42 +213,12 @@ namespace Qpid.Client public IMessage Receive() { - return Receive(0); + return Receive(Timeout.Infinite); } public IMessage ReceiveNoWait() { - CheckNotClosed(); - - lock (_syncLock) - { - // If someone is already receiving - if (_receiving) - { - throw new InvalidOperationException("Another thread is already receiving (possibly asynchronously)..."); - } - - _receiving = true; - } - - try - { - if (_synchronousQueue.Count > 0) - { - return ReturnMessageOrThrowAndPostDeliver(_synchronousQueue.Dequeue()); - } - else - { - return null; - } - } - finally - { - lock (_syncLock) - { - _receiving = false; - } - } + return Receive(0); } #endregion @@ -359,7 +320,7 @@ namespace Qpid.Client } else { - _synchronousQueue.Enqueue(jmsMessage); + _messageQueue.Enqueue(jmsMessage); } } catch (Exception e) @@ -380,10 +341,8 @@ namespace Qpid.Client if (_messageListener == null) { // offer only succeeds if there is a thread waiting for an item from the queue - if (_synchronousQueue.EnqueueNoThrow(cause)) - { - _logger.Debug("Passed exception to synchronous queue for propagation to receive()"); - } + _messageQueue.Enqueue(cause); + _logger.Debug("Passed exception to synchronous queue for propagation to receive()"); } DeregisterConsumer(); } |
