summaryrefslogtreecommitdiff
path: root/Final/dotnet/Qpid.Client.Tests/MultiConsumer/ProducerMultiConsumer.cs
diff options
context:
space:
mode:
Diffstat (limited to 'Final/dotnet/Qpid.Client.Tests/MultiConsumer/ProducerMultiConsumer.cs')
-rw-r--r--Final/dotnet/Qpid.Client.Tests/MultiConsumer/ProducerMultiConsumer.cs127
1 files changed, 127 insertions, 0 deletions
diff --git a/Final/dotnet/Qpid.Client.Tests/MultiConsumer/ProducerMultiConsumer.cs b/Final/dotnet/Qpid.Client.Tests/MultiConsumer/ProducerMultiConsumer.cs
new file mode 100644
index 0000000000..dc3b436a41
--- /dev/null
+++ b/Final/dotnet/Qpid.Client.Tests/MultiConsumer/ProducerMultiConsumer.cs
@@ -0,0 +1,127 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.Text;
+using System.Threading;
+using log4net;
+using NUnit.Framework;
+using Apache.Qpid.Messaging;
+
+namespace Apache.Qpid.Client.Tests
+{
+ [TestFixture]
+ public class ProducerMultiConsumer : BaseMessagingTestFixture
+ {
+ private static readonly ILog _logger = LogManager.GetLogger(typeof(ProducerMultiConsumer));
+
+ private string _commandQueueName = "ServiceQ1";
+
+ private const int CONSUMER_COUNT = 5;
+
+ private const int MESSAGE_COUNT = 1000;
+
+ private const string MESSAGE_DATA_BYTES = "****jfd ghljgl hjvhlj cvhvjf ldhfsj lhfdsjf hldsjfk hdslkfj hsdflk ";
+
+ AutoResetEvent _finishedEvent = new AutoResetEvent(false);
+
+ private static String GetData(int size)
+ {
+ StringBuilder buf = new StringBuilder(size);
+ int count = 0;
+ while (count < size + MESSAGE_DATA_BYTES.Length)
+ {
+ buf.Append(MESSAGE_DATA_BYTES);
+ count += MESSAGE_DATA_BYTES.Length;
+ }
+ if (count < size)
+ {
+ buf.Append(MESSAGE_DATA_BYTES, 0, size - count);
+ }
+
+ return buf.ToString();
+ }
+
+ private IMessagePublisher _publisher;
+
+ private IMessageConsumer[] _consumers = new IMessageConsumer[CONSUMER_COUNT];
+
+ private int _messageReceivedCount = 0;
+
+ [SetUp]
+ public override void Init()
+ {
+ base.Init();
+ _publisher = _channel.CreatePublisherBuilder()
+ .WithRoutingKey(_commandQueueName)
+ .WithExchangeName(ExchangeNameDefaults.TOPIC)
+ .Create();
+
+ _publisher.DisableMessageTimestamp = true;
+ _publisher.DeliveryMode = DeliveryMode.NonPersistent;
+
+ for (int i = 0; i < CONSUMER_COUNT; i++)
+ {
+ string queueName = _channel.GenerateUniqueName();
+ _channel.DeclareQueue(queueName, false, true, true);
+
+ _channel.Bind(queueName, ExchangeNameDefaults.TOPIC, _commandQueueName);
+
+ _consumers[i] = _channel.CreateConsumerBuilder(queueName)
+ .WithPrefetchLow(100).Create();
+ _consumers[i].OnMessage = new MessageReceivedDelegate(OnMessage);
+ }
+ _connection.Start();
+ }
+
+ public void OnMessage(IMessage m)
+ {
+ int newCount = Interlocked.Increment(ref _messageReceivedCount);
+ if (newCount % 1000 == 0) _logger.Info("Received count=" + newCount);
+ if (newCount == (MESSAGE_COUNT * CONSUMER_COUNT))
+ {
+ _logger.Info("All messages received");
+ _finishedEvent.Set();
+ }
+ if ( newCount % 100 == 0 )
+ System.Diagnostics.Debug.WriteLine(((ITextMessage)m).Text);
+ }
+
+ [Test]
+ public void RunTest()
+ {
+ for (int i = 0; i < MESSAGE_COUNT; i++)
+ {
+ ITextMessage msg;
+ try
+ {
+ msg = _channel.CreateTextMessage(GetData(512 + 8*i));
+ }
+ catch (Exception e)
+ {
+ _logger.Error("Error creating message: " + e, e);
+ break;
+ }
+ _publisher.Send(msg);
+ }
+ _finishedEvent.WaitOne();
+ }
+ }
+}