summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Greig <rgreig@apache.org>2007-02-27 15:45:33 +0000
committerRobert Greig <rgreig@apache.org>2007-02-27 15:45:33 +0000
commit1608fb9657ee5c322a74bd3bdc69428d6a26ff6a (patch)
tree3c98f2c782d4493846c0e90d5045d2bd99e2b215
parenta019367dc582d61fa3739f385592c0baf9b972b8 (diff)
downloadqpid-python-1608fb9657ee5c322a74bd3bdc69428d6a26ff6a.tar.gz
(Patch submitted by Tomas Restrepo) QPID-354.
With the patch, blocking receive calls with and without timeouts work. Added new unit test class to support functionality added. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@512288 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/dotnet/Qpid.Client.Tests/Qpid.Client.Tests.csproj1
-rw-r--r--qpid/dotnet/Qpid.Client.Tests/SimpleConsumer/TestSyncConsumer.cs127
-rw-r--r--qpid/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs59
-rw-r--r--qpid/dotnet/Qpid.Common.Tests/Qpid.Common.Tests.csproj1
-rw-r--r--qpid/dotnet/Qpid.Common.Tests/Qpid/Collections/TestConsumerProducerQueue.cs85
-rw-r--r--qpid/dotnet/Qpid.Common/Collections/ConsumerProducerQueue.cs113
-rw-r--r--qpid/dotnet/Qpid.Common/Qpid.Common.csproj1
7 files changed, 337 insertions, 50 deletions
diff --git a/qpid/dotnet/Qpid.Client.Tests/Qpid.Client.Tests.csproj b/qpid/dotnet/Qpid.Client.Tests/Qpid.Client.Tests.csproj
index 5674174f69..7fb96536a7 100644
--- a/qpid/dotnet/Qpid.Client.Tests/Qpid.Client.Tests.csproj
+++ b/qpid/dotnet/Qpid.Client.Tests/Qpid.Client.Tests.csproj
@@ -55,6 +55,7 @@
<Compile Include="requestreply1\ServiceProvidingClient.cs" />
<Compile Include="requestreply1\ServiceRequestingClient.cs" />
<Compile Include="Security\CallbackHandlerRegistryTests.cs" />
+ <Compile Include="SimpleConsumer\TestSyncConsumer.cs" />
<Compile Include="undeliverable\UndeliverableTest.cs" />
<Compile Include="url\ConnectionUrlTest.cs" />
</ItemGroup>
diff --git a/qpid/dotnet/Qpid.Client.Tests/SimpleConsumer/TestSyncConsumer.cs b/qpid/dotnet/Qpid.Client.Tests/SimpleConsumer/TestSyncConsumer.cs
new file mode 100644
index 0000000000..622c7c17c7
--- /dev/null
+++ b/qpid/dotnet/Qpid.Client.Tests/SimpleConsumer/TestSyncConsumer.cs
@@ -0,0 +1,127 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.Text;
+using System.Threading;
+using log4net;
+using NUnit.Framework;
+using Qpid.Messaging;
+
+namespace Qpid.Client.Tests
+{
+ [TestFixture]
+ public class TestSyncConsumer : BaseMessagingTestFixture
+ {
+ private static readonly ILog _logger = LogManager.GetLogger(typeof(TestSyncConsumer));
+
+ private string _commandQueueName = "ServiceQ1";
+ private const int MESSAGE_COUNT = 1000;
+ private const string MESSAGE_DATA_BYTES = "jfd ghljgl hjvhlj cvhvjf ldhfsj lhfdsjf hldsjfk hdslkfj hsdflk ";
+
+ private static String GetData(int size)
+ {
+ StringBuilder buf = new StringBuilder(size);
+ int count = 0;
+ while ( count < size + MESSAGE_DATA_BYTES.Length )
+ {
+ buf.Append(MESSAGE_DATA_BYTES);
+ count += MESSAGE_DATA_BYTES.Length;
+ }
+ if ( count < size )
+ {
+ buf.Append(MESSAGE_DATA_BYTES, 0, size - count);
+ }
+
+ return buf.ToString();
+ }
+
+ private IMessageConsumer _consumer;
+ private IMessagePublisher _publisher;
+
+ [SetUp]
+ public override void Init()
+ {
+ base.Init();
+ _publisher = _channel.CreatePublisherBuilder()
+ .WithRoutingKey(_commandQueueName)
+ .WithExchangeName(ExchangeNameDefaults.TOPIC)
+ .Create();
+
+ _publisher.DisableMessageTimestamp = true;
+ _publisher.DeliveryMode = DeliveryMode.NonPersistent;
+
+ string queueName = _channel.GenerateUniqueName();
+ _channel.DeclareQueue(queueName, false, true, true);
+
+ _channel.Bind(queueName, ExchangeNameDefaults.TOPIC, _commandQueueName);
+
+ _consumer = _channel.CreateConsumerBuilder(queueName)
+ .WithPrefetchLow(100).Create();
+ _connection.Start();
+ }
+
+ [Test]
+ public void ReceiveWithInfiniteWait()
+ {
+ // send all messages
+ for ( int i = 0; i < MESSAGE_COUNT; i++ )
+ {
+ ITextMessage msg;
+ try
+ {
+ msg = _channel.CreateTextMessage(GetData(512 + 8 * i));
+ } catch ( Exception e )
+ {
+ _logger.Error("Error creating message: " + e, e);
+ break;
+ }
+ _publisher.Send(msg);
+ }
+
+ _logger.Error("All messages sent");
+ // receive all messages
+ for ( int i = 0; i < MESSAGE_COUNT; i++ )
+ {
+ try
+ {
+ IMessage msg = _consumer.Receive();
+ Assert.IsNotNull(msg);
+ } catch ( Exception e )
+ {
+ _logger.Error("Error receiving message: " + e, e);
+ Assert.Fail(e.ToString());
+ }
+ }
+ }
+
+ [Test]
+ public void ReceiveWithTimeout()
+ {
+ ITextMessage msg = _channel.CreateTextMessage(GetData(512 + 8));
+ _publisher.Send(msg);
+
+ IMessage recvMsg = _consumer.Receive();
+ Assert.IsNotNull(recvMsg);
+ // empty queue, should timeout
+ Assert.IsNull(_consumer.Receive(1000));
+ }
+ }
+}
diff --git a/qpid/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs b/qpid/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs
index 133707c609..796a878eec 100644
--- a/qpid/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs
+++ b/qpid/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs
@@ -100,9 +100,9 @@ namespace Qpid.Client
/// <summary>
/// Used in the blocking receive methods to receive a message from
- /// the Channel thread. Argument true indicates we want strict FIFO semantics
+ /// the Channel thread.
/// </summary>
- private readonly SynchronousQueue _synchronousQueue = new SynchronousQueue(true);
+ private readonly ConsumerProducerQueue _messageQueue = new ConsumerProducerQueue();
private MessageFactoryRegistry _messageFactory;
@@ -188,17 +188,8 @@ namespace Qpid.Client
try
{
- object o;
- if (delay > 0)
- {
- //o = _synchronousQueue.Poll(l, TimeUnit.MILLISECONDS);
- throw new NotImplementedException("Need to implement synchronousQueue.Poll(timeout");
- }
- else
- {
- o = _synchronousQueue.DequeueBlocking();
- }
-
+ object o = _messageQueue.Dequeue(delay);
+
return ReturnMessageOrThrowAndPostDeliver(o);
}
finally
@@ -222,42 +213,12 @@ namespace Qpid.Client
public IMessage Receive()
{
- return Receive(0);
+ return Receive(Timeout.Infinite);
}
public IMessage ReceiveNoWait()
{
- CheckNotClosed();
-
- lock (_syncLock)
- {
- // If someone is already receiving
- if (_receiving)
- {
- throw new InvalidOperationException("Another thread is already receiving (possibly asynchronously)...");
- }
-
- _receiving = true;
- }
-
- try
- {
- if (_synchronousQueue.Count > 0)
- {
- return ReturnMessageOrThrowAndPostDeliver(_synchronousQueue.Dequeue());
- }
- else
- {
- return null;
- }
- }
- finally
- {
- lock (_syncLock)
- {
- _receiving = false;
- }
- }
+ return Receive(0);
}
#endregion
@@ -359,7 +320,7 @@ namespace Qpid.Client
}
else
{
- _synchronousQueue.Enqueue(jmsMessage);
+ _messageQueue.Enqueue(jmsMessage);
}
}
catch (Exception e)
@@ -380,10 +341,8 @@ namespace Qpid.Client
if (_messageListener == null)
{
// offer only succeeds if there is a thread waiting for an item from the queue
- if (_synchronousQueue.EnqueueNoThrow(cause))
- {
- _logger.Debug("Passed exception to synchronous queue for propagation to receive()");
- }
+ _messageQueue.Enqueue(cause);
+ _logger.Debug("Passed exception to synchronous queue for propagation to receive()");
}
DeregisterConsumer();
}
diff --git a/qpid/dotnet/Qpid.Common.Tests/Qpid.Common.Tests.csproj b/qpid/dotnet/Qpid.Common.Tests/Qpid.Common.Tests.csproj
index 684d21b324..7049e9b6bb 100644
--- a/qpid/dotnet/Qpid.Common.Tests/Qpid.Common.Tests.csproj
+++ b/qpid/dotnet/Qpid.Common.Tests/Qpid.Common.Tests.csproj
@@ -39,6 +39,7 @@
</ItemGroup>
<ItemGroup>
<Compile Include="Properties\AssemblyInfo.cs" />
+ <Compile Include="Qpid\Collections\TestConsumerProducerQueue.cs" />
<Compile Include="Qpid\Collections\TestLinkedHashtable.cs" />
<Compile Include="Qpid\Framing\TestEncodingUtils.cs" />
<Compile Include="Qpid\Framing\TestAMQType.cs" />
diff --git a/qpid/dotnet/Qpid.Common.Tests/Qpid/Collections/TestConsumerProducerQueue.cs b/qpid/dotnet/Qpid.Common.Tests/Qpid/Collections/TestConsumerProducerQueue.cs
new file mode 100644
index 0000000000..1a400aab68
--- /dev/null
+++ b/qpid/dotnet/Qpid.Common.Tests/Qpid/Collections/TestConsumerProducerQueue.cs
@@ -0,0 +1,85 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.Collections;
+using System.Text;
+using System.Threading;
+using NUnit.Framework;
+using Qpid.Collections;
+
+namespace Qpid.Collections.Tests
+{
+ [TestFixture]
+ public class TestConsumerProducerQueue
+ {
+ private ConsumerProducerQueue _queue;
+
+ [SetUp]
+ public void SetUp()
+ {
+ _queue = new ConsumerProducerQueue();
+ }
+
+ [Test]
+ public void CanDequeueWithInifiniteWait()
+ {
+ Thread producer = new Thread(new ThreadStart(ProduceFive));
+ producer.Start();
+ for ( int i = 0; i < 5; i++ )
+ {
+ object item = _queue.Dequeue();
+ Assert.IsNotNull(item);
+ }
+ }
+
+ [Test]
+ public void ReturnsNullOnDequeueTimeout()
+ {
+ // queue is empty
+ Assert.IsNull(_queue.Dequeue(500));
+ }
+
+ [Test]
+ public void DequeueTillEmpty()
+ {
+ _queue.Enqueue(1);
+ _queue.Enqueue(2);
+ _queue.Enqueue(3);
+ Assert.AreEqual(1, _queue.Dequeue());
+ Assert.AreEqual(2, _queue.Dequeue());
+ Assert.AreEqual(3, _queue.Dequeue());
+ // no messages in queue, will timeout
+ Assert.IsNull(_queue.Dequeue(500));
+ }
+
+
+ private void ProduceFive()
+ {
+ Thread.Sleep(1000);
+ _queue.Enqueue("test item 1");
+ _queue.Enqueue("test item 2");
+ _queue.Enqueue("test item 3");
+ Thread.Sleep(0);
+ _queue.Enqueue("test item 4");
+ _queue.Enqueue("test item 5");
+ }
+ }
+}
diff --git a/qpid/dotnet/Qpid.Common/Collections/ConsumerProducerQueue.cs b/qpid/dotnet/Qpid.Common/Collections/ConsumerProducerQueue.cs
new file mode 100644
index 0000000000..1e21284608
--- /dev/null
+++ b/qpid/dotnet/Qpid.Common/Collections/ConsumerProducerQueue.cs
@@ -0,0 +1,113 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.Collections;
+using System.Threading;
+
+
+namespace Qpid.Collections
+{
+ /// <summary>
+ /// Simple FIFO queue to support multi-threaded consumer
+ /// and producers. It supports timeouts in dequeue operations.
+ /// </summary>
+ public sealed class ConsumerProducerQueue
+ {
+ private Queue _queue = new Queue();
+ private WaitSemaphore _semaphore = new WaitSemaphore();
+
+ /// <summary>
+ /// Put an item into the tail of the queue
+ /// </summary>
+ /// <param name="item"></param>
+ public void Enqueue(object item)
+ {
+ lock ( _queue.SyncRoot )
+ {
+ _queue.Enqueue(item);
+ _semaphore.Increment();
+ }
+ }
+
+ /// <summary>
+ /// Wait indefinitely for an item to be available
+ /// on the queue.
+ /// </summary>
+ /// <returns>The object at the head of the queue</returns>
+ public object Dequeue()
+ {
+ return Dequeue(Timeout.Infinite);
+ }
+
+ /// <summary>
+ /// Wait up to the number of milliseconds specified
+ /// for an item to be available on the queue
+ /// </summary>
+ /// <param name="timeout">Number of milliseconds to wait</param>
+ /// <returns>The object at the head of the queue, or null
+ /// if the timeout expires</returns>
+ public object Dequeue(long timeout)
+ {
+ if ( _semaphore.Decrement(timeout) )
+ {
+ lock ( _queue.SyncRoot )
+ {
+ return _queue.Dequeue();
+ }
+ }
+ return null;
+ }
+
+ #region Simple Semaphore
+ //
+ // Simple Semaphore
+ //
+
+ class WaitSemaphore
+ {
+ private int _count;
+ private AutoResetEvent _event = new AutoResetEvent(false);
+
+ public void Increment()
+ {
+ Interlocked.Increment(ref _count);
+ _event.Set();
+ }
+
+ public bool Decrement(long timeout)
+ {
+ if ( timeout > int.MaxValue )
+ throw new ArgumentOutOfRangeException("timeout", timeout, "Must be <= Int32.MaxValue");
+
+ int millis = (int) (timeout & 0x7FFFFFFF);
+ if ( Interlocked.Decrement(ref _count) > 0 )
+ {
+ // there are messages in queue, so no need to wait
+ return true;
+ } else
+ {
+ return _event.WaitOne(millis, false);
+ }
+ }
+ }
+ #endregion // Simple Semaphore
+ }
+}
diff --git a/qpid/dotnet/Qpid.Common/Qpid.Common.csproj b/qpid/dotnet/Qpid.Common/Qpid.Common.csproj
index dccadd4667..ec85567a25 100644
--- a/qpid/dotnet/Qpid.Common/Qpid.Common.csproj
+++ b/qpid/dotnet/Qpid.Common/Qpid.Common.csproj
@@ -47,6 +47,7 @@
<Compile Include="AMQUndeliveredException.cs" />
<Compile Include="AssemblySettings.cs" />
<Compile Include="Collections\LinkedHashtable.cs" />
+ <Compile Include="Collections\ConsumerProducerQueue.cs" />
<Compile Include="Framing\AMQDataBlockDecoder.cs" />
<Compile Include="Framing\AMQDataBlockEncoder.cs" />
<Compile Include="Framing\AMQFrame.cs" />