summaryrefslogtreecommitdiff
path: root/dotnet/Qpid.Client.Tests
diff options
context:
space:
mode:
authorSteven Shaw <steshaw@apache.org>2006-11-29 05:51:43 +0000
committerSteven Shaw <steshaw@apache.org>2006-11-29 05:51:43 +0000
commit1df5c37bfa8fe1ec20e41dfd2bb06bc10a790917 (patch)
tree8012603b18bae10e2c781debf36a5cd9b7e89bb6 /dotnet/Qpid.Client.Tests
parent74abf7b06d6d3d4d291f87d750500ceded74dd1b (diff)
downloadqpid-python-1df5c37bfa8fe1ec20e41dfd2bb06bc10a790917.tar.gz
QPID-137. First stab at porting enough to get AutoAcknowledge mode working.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@480423 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'dotnet/Qpid.Client.Tests')
-rw-r--r--dotnet/Qpid.Client.Tests/failover/FailoverTxTest.cs65
1 files changed, 55 insertions, 10 deletions
diff --git a/dotnet/Qpid.Client.Tests/failover/FailoverTxTest.cs b/dotnet/Qpid.Client.Tests/failover/FailoverTxTest.cs
index eb08ca9446..79a04e79eb 100644
--- a/dotnet/Qpid.Client.Tests/failover/FailoverTxTest.cs
+++ b/dotnet/Qpid.Client.Tests/failover/FailoverTxTest.cs
@@ -35,12 +35,12 @@ namespace Qpid.Client.Tests.failover
const int NUM_ITERATIONS = 10;
const int NUM_COMMITED_MESSAGES = 10;
- const int NUM_ROLLEDBACK_MESSAGES = 5;
+ const int NUM_ROLLEDBACK_MESSAGES = 3;
const int SLEEP_MILLIS = 500;
AMQConnection _connection;
- public void onMessage(IMessage message)
+ public void OnMessage(IMessage message)
{
try
{
@@ -48,7 +48,40 @@ namespace Qpid.Client.Tests.failover
}
catch (QpidException e)
{
- error(e);
+ Error(e);
+ }
+ }
+
+ class NoWaitConsumer
+ {
+ FailoverTxTest _failoverTxTest;
+ IMessageConsumer _consumer;
+
+ internal NoWaitConsumer(FailoverTxTest failoverTxTest, IMessageConsumer channel)
+ {
+ _failoverTxTest = failoverTxTest;
+ _consumer = channel;
+ }
+
+ internal void Run()
+ {
+ int messages = 0;
+ while (messages < NUM_COMMITED_MESSAGES)
+ {
+ IMessage msg = _consumer.ReceiveNoWait();
+ if (msg != null)
+ {
+ _log.Info("NoWait received message");
+ ++messages;
+ _failoverTxTest.OnMessage(msg);
+ }
+ else
+ {
+ Thread.Sleep(1);
+ }
+
+ }
+
}
}
@@ -60,7 +93,7 @@ namespace Qpid.Client.Tests.failover
_log.Info("connectionInfo = " + connectionInfo);
_log.Info("connection.asUrl = " + _connection.toURL());
- IChannel receivingChannel = _connection.CreateChannel(false, AcknowledgeMode.NoAcknowledge);
+ IChannel receivingChannel = _connection.CreateChannel(false, AcknowledgeMode.AutoAcknowledge);
string queueName = receivingChannel.GenerateUniqueName();
@@ -70,11 +103,23 @@ namespace Qpid.Client.Tests.failover
// No need to call Queue.Bind as automatically bound to default direct exchange.
receivingChannel.Bind(queueName, "amq.direct", queueName);
- receivingChannel.CreateConsumerBuilder(queueName).Create().OnMessage = new MessageReceivedDelegate(onMessage);
+
+ IMessageConsumer consumer = receivingChannel.CreateConsumerBuilder(queueName).Create();
+ bool useThread = true;
+ if (useThread)
+ {
+ NoWaitConsumer noWaitConsumer = new NoWaitConsumer(this, consumer);
+ new Thread(noWaitConsumer.Run).Start();
+ }
+ else
+ {
+ //receivingChannel.CreateConsumerBuilder(queueName).Create().OnMessage = new MessageReceivedDelegate(onMessage);
+ consumer.OnMessage = new MessageReceivedDelegate(OnMessage);
+ }
_connection.Start();
- publishInTx(queueName);
+ PublishInTx(queueName);
Thread.Sleep(2000); // Wait a while for last messages.
@@ -82,7 +127,7 @@ namespace Qpid.Client.Tests.failover
_log.Info("FailoverTxText complete");
}
- private void publishInTx(string routingKey)
+ private void PublishInTx(string routingKey)
{
_log.Info("sendInTx");
bool transacted = true;
@@ -113,13 +158,13 @@ namespace Qpid.Client.Tests.failover
}
}
- private void error(Exception e)
+ private void Error(Exception e)
{
_log.Fatal("Exception received. About to stop.", e);
- stop();
+ Stop();
}
- private void stop()
+ private void Stop()
{
_log.Info("Stopping...");
try