diff options
| author | Robert Greig <rgreig@apache.org> | 2007-02-27 15:45:33 +0000 |
|---|---|---|
| committer | Robert Greig <rgreig@apache.org> | 2007-02-27 15:45:33 +0000 |
| commit | 1608fb9657ee5c322a74bd3bdc69428d6a26ff6a (patch) | |
| tree | 3c98f2c782d4493846c0e90d5045d2bd99e2b215 | |
| parent | a019367dc582d61fa3739f385592c0baf9b972b8 (diff) | |
| download | qpid-python-1608fb9657ee5c322a74bd3bdc69428d6a26ff6a.tar.gz | |
(Patch submitted by Tomas Restrepo) QPID-354.
With the patch, blocking receive calls with and without timeouts work. Added new unit test class to support functionality added.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@512288 13f79535-47bb-0310-9956-ffa450edef68
7 files changed, 337 insertions, 50 deletions
diff --git a/qpid/dotnet/Qpid.Client.Tests/Qpid.Client.Tests.csproj b/qpid/dotnet/Qpid.Client.Tests/Qpid.Client.Tests.csproj index 5674174f69..7fb96536a7 100644 --- a/qpid/dotnet/Qpid.Client.Tests/Qpid.Client.Tests.csproj +++ b/qpid/dotnet/Qpid.Client.Tests/Qpid.Client.Tests.csproj @@ -55,6 +55,7 @@ <Compile Include="requestreply1\ServiceProvidingClient.cs" />
<Compile Include="requestreply1\ServiceRequestingClient.cs" />
<Compile Include="Security\CallbackHandlerRegistryTests.cs" />
+ <Compile Include="SimpleConsumer\TestSyncConsumer.cs" />
<Compile Include="undeliverable\UndeliverableTest.cs" />
<Compile Include="url\ConnectionUrlTest.cs" />
</ItemGroup>
diff --git a/qpid/dotnet/Qpid.Client.Tests/SimpleConsumer/TestSyncConsumer.cs b/qpid/dotnet/Qpid.Client.Tests/SimpleConsumer/TestSyncConsumer.cs new file mode 100644 index 0000000000..622c7c17c7 --- /dev/null +++ b/qpid/dotnet/Qpid.Client.Tests/SimpleConsumer/TestSyncConsumer.cs @@ -0,0 +1,127 @@ +/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.Text;
+using System.Threading;
+using log4net;
+using NUnit.Framework;
+using Qpid.Messaging;
+
+namespace Qpid.Client.Tests
+{
+ [TestFixture]
+ public class TestSyncConsumer : BaseMessagingTestFixture
+ {
+ private static readonly ILog _logger = LogManager.GetLogger(typeof(TestSyncConsumer));
+
+ private string _commandQueueName = "ServiceQ1";
+ private const int MESSAGE_COUNT = 1000;
+ private const string MESSAGE_DATA_BYTES = "jfd ghljgl hjvhlj cvhvjf ldhfsj lhfdsjf hldsjfk hdslkfj hsdflk ";
+
+ private static String GetData(int size)
+ {
+ StringBuilder buf = new StringBuilder(size);
+ int count = 0;
+ while ( count < size + MESSAGE_DATA_BYTES.Length )
+ {
+ buf.Append(MESSAGE_DATA_BYTES);
+ count += MESSAGE_DATA_BYTES.Length;
+ }
+ if ( count < size )
+ {
+ buf.Append(MESSAGE_DATA_BYTES, 0, size - count);
+ }
+
+ return buf.ToString();
+ }
+
+ private IMessageConsumer _consumer;
+ private IMessagePublisher _publisher;
+
+ [SetUp]
+ public override void Init()
+ {
+ base.Init();
+ _publisher = _channel.CreatePublisherBuilder()
+ .WithRoutingKey(_commandQueueName)
+ .WithExchangeName(ExchangeNameDefaults.TOPIC)
+ .Create();
+
+ _publisher.DisableMessageTimestamp = true;
+ _publisher.DeliveryMode = DeliveryMode.NonPersistent;
+
+ string queueName = _channel.GenerateUniqueName();
+ _channel.DeclareQueue(queueName, false, true, true);
+
+ _channel.Bind(queueName, ExchangeNameDefaults.TOPIC, _commandQueueName);
+
+ _consumer = _channel.CreateConsumerBuilder(queueName)
+ .WithPrefetchLow(100).Create();
+ _connection.Start();
+ }
+
+ [Test]
+ public void ReceiveWithInfiniteWait()
+ {
+ // send all messages
+ for ( int i = 0; i < MESSAGE_COUNT; i++ )
+ {
+ ITextMessage msg;
+ try
+ {
+ msg = _channel.CreateTextMessage(GetData(512 + 8 * i));
+ } catch ( Exception e )
+ {
+ _logger.Error("Error creating message: " + e, e);
+ break;
+ }
+ _publisher.Send(msg);
+ }
+
+ _logger.Error("All messages sent");
+ // receive all messages
+ for ( int i = 0; i < MESSAGE_COUNT; i++ )
+ {
+ try
+ {
+ IMessage msg = _consumer.Receive();
+ Assert.IsNotNull(msg);
+ } catch ( Exception e )
+ {
+ _logger.Error("Error receiving message: " + e, e);
+ Assert.Fail(e.ToString());
+ }
+ }
+ }
+
+ [Test]
+ public void ReceiveWithTimeout()
+ {
+ ITextMessage msg = _channel.CreateTextMessage(GetData(512 + 8));
+ _publisher.Send(msg);
+
+ IMessage recvMsg = _consumer.Receive();
+ Assert.IsNotNull(recvMsg);
+ // empty queue, should timeout
+ Assert.IsNull(_consumer.Receive(1000));
+ }
+ }
+}
diff --git a/qpid/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs b/qpid/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs index 133707c609..796a878eec 100644 --- a/qpid/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs +++ b/qpid/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs @@ -100,9 +100,9 @@ namespace Qpid.Client /// <summary> /// Used in the blocking receive methods to receive a message from - /// the Channel thread. Argument true indicates we want strict FIFO semantics + /// the Channel thread. /// </summary> - private readonly SynchronousQueue _synchronousQueue = new SynchronousQueue(true); + private readonly ConsumerProducerQueue _messageQueue = new ConsumerProducerQueue(); private MessageFactoryRegistry _messageFactory; @@ -188,17 +188,8 @@ namespace Qpid.Client try { - object o; - if (delay > 0) - { - //o = _synchronousQueue.Poll(l, TimeUnit.MILLISECONDS); - throw new NotImplementedException("Need to implement synchronousQueue.Poll(timeout"); - } - else - { - o = _synchronousQueue.DequeueBlocking(); - } - + object o = _messageQueue.Dequeue(delay); + return ReturnMessageOrThrowAndPostDeliver(o); } finally @@ -222,42 +213,12 @@ namespace Qpid.Client public IMessage Receive() { - return Receive(0); + return Receive(Timeout.Infinite); } public IMessage ReceiveNoWait() { - CheckNotClosed(); - - lock (_syncLock) - { - // If someone is already receiving - if (_receiving) - { - throw new InvalidOperationException("Another thread is already receiving (possibly asynchronously)..."); - } - - _receiving = true; - } - - try - { - if (_synchronousQueue.Count > 0) - { - return ReturnMessageOrThrowAndPostDeliver(_synchronousQueue.Dequeue()); - } - else - { - return null; - } - } - finally - { - lock (_syncLock) - { - _receiving = false; - } - } + return Receive(0); } #endregion @@ -359,7 +320,7 @@ namespace Qpid.Client } else { - _synchronousQueue.Enqueue(jmsMessage); + _messageQueue.Enqueue(jmsMessage); } } catch (Exception e) @@ -380,10 +341,8 @@ namespace Qpid.Client if (_messageListener == null) { // offer only succeeds if there is a thread waiting for an item from the queue - if (_synchronousQueue.EnqueueNoThrow(cause)) - { - _logger.Debug("Passed exception to synchronous queue for propagation to receive()"); - } + _messageQueue.Enqueue(cause); + _logger.Debug("Passed exception to synchronous queue for propagation to receive()"); } DeregisterConsumer(); } diff --git a/qpid/dotnet/Qpid.Common.Tests/Qpid.Common.Tests.csproj b/qpid/dotnet/Qpid.Common.Tests/Qpid.Common.Tests.csproj index 684d21b324..7049e9b6bb 100644 --- a/qpid/dotnet/Qpid.Common.Tests/Qpid.Common.Tests.csproj +++ b/qpid/dotnet/Qpid.Common.Tests/Qpid.Common.Tests.csproj @@ -39,6 +39,7 @@ </ItemGroup>
<ItemGroup>
<Compile Include="Properties\AssemblyInfo.cs" />
+ <Compile Include="Qpid\Collections\TestConsumerProducerQueue.cs" />
<Compile Include="Qpid\Collections\TestLinkedHashtable.cs" />
<Compile Include="Qpid\Framing\TestEncodingUtils.cs" />
<Compile Include="Qpid\Framing\TestAMQType.cs" />
diff --git a/qpid/dotnet/Qpid.Common.Tests/Qpid/Collections/TestConsumerProducerQueue.cs b/qpid/dotnet/Qpid.Common.Tests/Qpid/Collections/TestConsumerProducerQueue.cs new file mode 100644 index 0000000000..1a400aab68 --- /dev/null +++ b/qpid/dotnet/Qpid.Common.Tests/Qpid/Collections/TestConsumerProducerQueue.cs @@ -0,0 +1,85 @@ +/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.Collections;
+using System.Text;
+using System.Threading;
+using NUnit.Framework;
+using Qpid.Collections;
+
+namespace Qpid.Collections.Tests
+{
+ [TestFixture]
+ public class TestConsumerProducerQueue
+ {
+ private ConsumerProducerQueue _queue;
+
+ [SetUp]
+ public void SetUp()
+ {
+ _queue = new ConsumerProducerQueue();
+ }
+
+ [Test]
+ public void CanDequeueWithInifiniteWait()
+ {
+ Thread producer = new Thread(new ThreadStart(ProduceFive));
+ producer.Start();
+ for ( int i = 0; i < 5; i++ )
+ {
+ object item = _queue.Dequeue();
+ Assert.IsNotNull(item);
+ }
+ }
+
+ [Test]
+ public void ReturnsNullOnDequeueTimeout()
+ {
+ // queue is empty
+ Assert.IsNull(_queue.Dequeue(500));
+ }
+
+ [Test]
+ public void DequeueTillEmpty()
+ {
+ _queue.Enqueue(1);
+ _queue.Enqueue(2);
+ _queue.Enqueue(3);
+ Assert.AreEqual(1, _queue.Dequeue());
+ Assert.AreEqual(2, _queue.Dequeue());
+ Assert.AreEqual(3, _queue.Dequeue());
+ // no messages in queue, will timeout
+ Assert.IsNull(_queue.Dequeue(500));
+ }
+
+
+ private void ProduceFive()
+ {
+ Thread.Sleep(1000);
+ _queue.Enqueue("test item 1");
+ _queue.Enqueue("test item 2");
+ _queue.Enqueue("test item 3");
+ Thread.Sleep(0);
+ _queue.Enqueue("test item 4");
+ _queue.Enqueue("test item 5");
+ }
+ }
+}
diff --git a/qpid/dotnet/Qpid.Common/Collections/ConsumerProducerQueue.cs b/qpid/dotnet/Qpid.Common/Collections/ConsumerProducerQueue.cs new file mode 100644 index 0000000000..1e21284608 --- /dev/null +++ b/qpid/dotnet/Qpid.Common/Collections/ConsumerProducerQueue.cs @@ -0,0 +1,113 @@ +/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.Collections;
+using System.Threading;
+
+
+namespace Qpid.Collections
+{
+ /// <summary>
+ /// Simple FIFO queue to support multi-threaded consumer
+ /// and producers. It supports timeouts in dequeue operations.
+ /// </summary>
+ public sealed class ConsumerProducerQueue
+ {
+ private Queue _queue = new Queue();
+ private WaitSemaphore _semaphore = new WaitSemaphore();
+
+ /// <summary>
+ /// Put an item into the tail of the queue
+ /// </summary>
+ /// <param name="item"></param>
+ public void Enqueue(object item)
+ {
+ lock ( _queue.SyncRoot )
+ {
+ _queue.Enqueue(item);
+ _semaphore.Increment();
+ }
+ }
+
+ /// <summary>
+ /// Wait indefinitely for an item to be available
+ /// on the queue.
+ /// </summary>
+ /// <returns>The object at the head of the queue</returns>
+ public object Dequeue()
+ {
+ return Dequeue(Timeout.Infinite);
+ }
+
+ /// <summary>
+ /// Wait up to the number of milliseconds specified
+ /// for an item to be available on the queue
+ /// </summary>
+ /// <param name="timeout">Number of milliseconds to wait</param>
+ /// <returns>The object at the head of the queue, or null
+ /// if the timeout expires</returns>
+ public object Dequeue(long timeout)
+ {
+ if ( _semaphore.Decrement(timeout) )
+ {
+ lock ( _queue.SyncRoot )
+ {
+ return _queue.Dequeue();
+ }
+ }
+ return null;
+ }
+
+ #region Simple Semaphore
+ //
+ // Simple Semaphore
+ //
+
+ class WaitSemaphore
+ {
+ private int _count;
+ private AutoResetEvent _event = new AutoResetEvent(false);
+
+ public void Increment()
+ {
+ Interlocked.Increment(ref _count);
+ _event.Set();
+ }
+
+ public bool Decrement(long timeout)
+ {
+ if ( timeout > int.MaxValue )
+ throw new ArgumentOutOfRangeException("timeout", timeout, "Must be <= Int32.MaxValue");
+
+ int millis = (int) (timeout & 0x7FFFFFFF);
+ if ( Interlocked.Decrement(ref _count) > 0 )
+ {
+ // there are messages in queue, so no need to wait
+ return true;
+ } else
+ {
+ return _event.WaitOne(millis, false);
+ }
+ }
+ }
+ #endregion // Simple Semaphore
+ }
+}
diff --git a/qpid/dotnet/Qpid.Common/Qpid.Common.csproj b/qpid/dotnet/Qpid.Common/Qpid.Common.csproj index dccadd4667..ec85567a25 100644 --- a/qpid/dotnet/Qpid.Common/Qpid.Common.csproj +++ b/qpid/dotnet/Qpid.Common/Qpid.Common.csproj @@ -47,6 +47,7 @@ <Compile Include="AMQUndeliveredException.cs" />
<Compile Include="AssemblySettings.cs" />
<Compile Include="Collections\LinkedHashtable.cs" />
+ <Compile Include="Collections\ConsumerProducerQueue.cs" />
<Compile Include="Framing\AMQDataBlockDecoder.cs" />
<Compile Include="Framing\AMQDataBlockEncoder.cs" />
<Compile Include="Framing\AMQFrame.cs" />
|
