diff options
| author | Steven Shaw <steshaw@apache.org> | 2006-11-30 18:54:48 +0000 |
|---|---|---|
| committer | Steven Shaw <steshaw@apache.org> | 2006-11-30 18:54:48 +0000 |
| commit | 33c04c7e619a65e2d92ac231805e8ad27f4a29c2 (patch) | |
| tree | 1fdc64001d5e0bf1f34883927d7901b456b7bd3b /dotnet/Qpid.Client.Tests/failover | |
| parent | 8f21f5d6cacd35e6fe04a0b4a5567fc4929f997e (diff) | |
| download | qpid-python-33c04c7e619a65e2d92ac231805e8ad27f4a29c2.tar.gz | |
QPID-136 Ported Prefetch with PrefetchHigh and PrefetchLow
QPID-137 Ported AcknowledgeModes
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@481035 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'dotnet/Qpid.Client.Tests/failover')
| -rw-r--r-- | dotnet/Qpid.Client.Tests/failover/FailoverTest.cs | 8 | ||||
| -rw-r--r-- | dotnet/Qpid.Client.Tests/failover/FailoverTxTest.cs | 33 |
2 files changed, 27 insertions, 14 deletions
diff --git a/dotnet/Qpid.Client.Tests/failover/FailoverTest.cs b/dotnet/Qpid.Client.Tests/failover/FailoverTest.cs index 952cc2a58e..52ef76c559 100644 --- a/dotnet/Qpid.Client.Tests/failover/FailoverTest.cs +++ b/dotnet/Qpid.Client.Tests/failover/FailoverTest.cs @@ -78,8 +78,8 @@ namespace Qpid.Client.Tests.failover // _publisher = _channel.CreatePublisher(exchangeName, exchangeClass, routingKey); _publisher = _channel.CreatePublisherBuilder() - .withRoutingKey(routingKey) - .withExchangeName(exchangeName) + .WithRoutingKey(routingKey) + .WithExchangeName(exchangeName) .Create(); _publisher.Send(msg); } @@ -206,8 +206,8 @@ namespace Qpid.Client.Tests.failover //return _channel.CreatePublisher(exchangeName, exchangeClass, routingKey); return _session.CreatePublisherBuilder() - .withExchangeName(exchangeName) - .withRoutingKey(routingKey) + .WithExchangeName(exchangeName) + .WithRoutingKey(routingKey) .Create(); } } diff --git a/dotnet/Qpid.Client.Tests/failover/FailoverTxTest.cs b/dotnet/Qpid.Client.Tests/failover/FailoverTxTest.cs index 79a04e79eb..4e95c12290 100644 --- a/dotnet/Qpid.Client.Tests/failover/FailoverTxTest.cs +++ b/dotnet/Qpid.Client.Tests/failover/FailoverTxTest.cs @@ -36,8 +36,11 @@ namespace Qpid.Client.Tests.failover const int NUM_ITERATIONS = 10; const int NUM_COMMITED_MESSAGES = 10; const int NUM_ROLLEDBACK_MESSAGES = 3; - const int SLEEP_MILLIS = 500; + const int SLEEP_MILLIS = 50; + // AutoAcknowledge, ClientAcknowledge, DupsOkAcknowledge, NoAcknowledge, PreAcknowledge + AcknowledgeMode _acknowledgeMode = AcknowledgeMode.DupsOkAcknowledge; + const bool _noWait = true; // use Receive or ReceiveNoWait AMQConnection _connection; public void OnMessage(IMessage message) @@ -45,6 +48,11 @@ namespace Qpid.Client.Tests.failover try { _log.Info("Received: " + ((ITextMessage) message).Text); + if (_acknowledgeMode == AcknowledgeMode.ClientAcknowledge) + { + _log.Info("client acknowledging"); + message.Acknowledge(); + } } catch (QpidException e) { @@ -56,11 +64,13 @@ namespace Qpid.Client.Tests.failover { FailoverTxTest _failoverTxTest; IMessageConsumer _consumer; + private bool _noWait; - internal NoWaitConsumer(FailoverTxTest failoverTxTest, IMessageConsumer channel) + internal NoWaitConsumer(FailoverTxTest failoverTxTest, IMessageConsumer channel, bool noWait) { _failoverTxTest = failoverTxTest; _consumer = channel; + _noWait = noWait; } internal void Run() @@ -68,7 +78,9 @@ namespace Qpid.Client.Tests.failover int messages = 0; while (messages < NUM_COMMITED_MESSAGES) { - IMessage msg = _consumer.ReceiveNoWait(); + IMessage msg; + if (_noWait) msg = _consumer.ReceiveNoWait(); + else msg = _consumer.Receive(); if (msg != null) { _log.Info("NoWait received message"); @@ -93,7 +105,8 @@ namespace Qpid.Client.Tests.failover _log.Info("connectionInfo = " + connectionInfo); _log.Info("connection.asUrl = " + _connection.toURL()); - IChannel receivingChannel = _connection.CreateChannel(false, AcknowledgeMode.AutoAcknowledge); + _log.Info("AcknowledgeMode is " + _acknowledgeMode); + IChannel receivingChannel = _connection.CreateChannel(false, _acknowledgeMode); string queueName = receivingChannel.GenerateUniqueName(); @@ -103,17 +116,17 @@ namespace Qpid.Client.Tests.failover // No need to call Queue.Bind as automatically bound to default direct exchange. receivingChannel.Bind(queueName, "amq.direct", queueName); - - IMessageConsumer consumer = receivingChannel.CreateConsumerBuilder(queueName).Create(); - bool useThread = true; + IMessageConsumer consumer = receivingChannel.CreateConsumerBuilder(queueName) + .WithPrefetchLow(30) + .WithPrefetchHigh(60).Create(); + bool useThread = false; if (useThread) { - NoWaitConsumer noWaitConsumer = new NoWaitConsumer(this, consumer); + NoWaitConsumer noWaitConsumer = new NoWaitConsumer(this, consumer, _noWait); new Thread(noWaitConsumer.Run).Start(); } else { - //receivingChannel.CreateConsumerBuilder(queueName).Create().OnMessage = new MessageReceivedDelegate(onMessage); consumer.OnMessage = new MessageReceivedDelegate(OnMessage); } @@ -133,7 +146,7 @@ namespace Qpid.Client.Tests.failover bool transacted = true; IChannel publishingChannel = _connection.CreateChannel(transacted, AcknowledgeMode.NoAcknowledge); IMessagePublisher publisher = publishingChannel.CreatePublisherBuilder() - .withRoutingKey(routingKey) + .WithRoutingKey(routingKey) .Create(); for (int i = 1; i <= NUM_ITERATIONS; ++i) |
