diff options
| author | Steven Shaw <steshaw@apache.org> | 2006-11-29 05:51:43 +0000 |
|---|---|---|
| committer | Steven Shaw <steshaw@apache.org> | 2006-11-29 05:51:43 +0000 |
| commit | 1df5c37bfa8fe1ec20e41dfd2bb06bc10a790917 (patch) | |
| tree | 8012603b18bae10e2c781debf36a5cd9b7e89bb6 /dotnet/Qpid.Client.Tests | |
| parent | 74abf7b06d6d3d4d291f87d750500ceded74dd1b (diff) | |
| download | qpid-python-1df5c37bfa8fe1ec20e41dfd2bb06bc10a790917.tar.gz | |
QPID-137. First stab at porting enough to get AutoAcknowledge mode working.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@480423 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'dotnet/Qpid.Client.Tests')
| -rw-r--r-- | dotnet/Qpid.Client.Tests/failover/FailoverTxTest.cs | 65 |
1 files changed, 55 insertions, 10 deletions
diff --git a/dotnet/Qpid.Client.Tests/failover/FailoverTxTest.cs b/dotnet/Qpid.Client.Tests/failover/FailoverTxTest.cs index eb08ca9446..79a04e79eb 100644 --- a/dotnet/Qpid.Client.Tests/failover/FailoverTxTest.cs +++ b/dotnet/Qpid.Client.Tests/failover/FailoverTxTest.cs @@ -35,12 +35,12 @@ namespace Qpid.Client.Tests.failover const int NUM_ITERATIONS = 10; const int NUM_COMMITED_MESSAGES = 10; - const int NUM_ROLLEDBACK_MESSAGES = 5; + const int NUM_ROLLEDBACK_MESSAGES = 3; const int SLEEP_MILLIS = 500; AMQConnection _connection; - public void onMessage(IMessage message) + public void OnMessage(IMessage message) { try { @@ -48,7 +48,40 @@ namespace Qpid.Client.Tests.failover } catch (QpidException e) { - error(e); + Error(e); + } + } + + class NoWaitConsumer + { + FailoverTxTest _failoverTxTest; + IMessageConsumer _consumer; + + internal NoWaitConsumer(FailoverTxTest failoverTxTest, IMessageConsumer channel) + { + _failoverTxTest = failoverTxTest; + _consumer = channel; + } + + internal void Run() + { + int messages = 0; + while (messages < NUM_COMMITED_MESSAGES) + { + IMessage msg = _consumer.ReceiveNoWait(); + if (msg != null) + { + _log.Info("NoWait received message"); + ++messages; + _failoverTxTest.OnMessage(msg); + } + else + { + Thread.Sleep(1); + } + + } + } } @@ -60,7 +93,7 @@ namespace Qpid.Client.Tests.failover _log.Info("connectionInfo = " + connectionInfo); _log.Info("connection.asUrl = " + _connection.toURL()); - IChannel receivingChannel = _connection.CreateChannel(false, AcknowledgeMode.NoAcknowledge); + IChannel receivingChannel = _connection.CreateChannel(false, AcknowledgeMode.AutoAcknowledge); string queueName = receivingChannel.GenerateUniqueName(); @@ -70,11 +103,23 @@ namespace Qpid.Client.Tests.failover // No need to call Queue.Bind as automatically bound to default direct exchange. receivingChannel.Bind(queueName, "amq.direct", queueName); - receivingChannel.CreateConsumerBuilder(queueName).Create().OnMessage = new MessageReceivedDelegate(onMessage); + + IMessageConsumer consumer = receivingChannel.CreateConsumerBuilder(queueName).Create(); + bool useThread = true; + if (useThread) + { + NoWaitConsumer noWaitConsumer = new NoWaitConsumer(this, consumer); + new Thread(noWaitConsumer.Run).Start(); + } + else + { + //receivingChannel.CreateConsumerBuilder(queueName).Create().OnMessage = new MessageReceivedDelegate(onMessage); + consumer.OnMessage = new MessageReceivedDelegate(OnMessage); + } _connection.Start(); - publishInTx(queueName); + PublishInTx(queueName); Thread.Sleep(2000); // Wait a while for last messages. @@ -82,7 +127,7 @@ namespace Qpid.Client.Tests.failover _log.Info("FailoverTxText complete"); } - private void publishInTx(string routingKey) + private void PublishInTx(string routingKey) { _log.Info("sendInTx"); bool transacted = true; @@ -113,13 +158,13 @@ namespace Qpid.Client.Tests.failover } } - private void error(Exception e) + private void Error(Exception e) { _log.Fatal("Exception received. About to stop.", e); - stop(); + Stop(); } - private void stop() + private void Stop() { _log.Info("Stopping..."); try |
