summaryrefslogtreecommitdiff
path: root/dotnet/Qpid.Client.Tests/failover
diff options
context:
space:
mode:
Diffstat (limited to 'dotnet/Qpid.Client.Tests/failover')
-rw-r--r--dotnet/Qpid.Client.Tests/failover/FailoverTest.cs258
-rw-r--r--dotnet/Qpid.Client.Tests/failover/FailoverTxTest.cs194
2 files changed, 452 insertions, 0 deletions
diff --git a/dotnet/Qpid.Client.Tests/failover/FailoverTest.cs b/dotnet/Qpid.Client.Tests/failover/FailoverTest.cs
new file mode 100644
index 0000000000..6b3ab068f5
--- /dev/null
+++ b/dotnet/Qpid.Client.Tests/failover/FailoverTest.cs
@@ -0,0 +1,258 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.Threading;
+using log4net;
+using NUnit.Framework;
+using Qpid.Client.qms;
+using Qpid.Messaging;
+
+namespace Qpid.Client.Tests.failover
+{
+ [TestFixture]
+ public class FailoverTest : IConnectionListener
+ {
+ private static readonly ILog _logger = LogManager.GetLogger(typeof(FailoverTest));
+
+ private IConnection _connection;
+ private IChannel _channel;
+ private IMessagePublisher _publisher;
+ private int _count;
+
+ private IMessageConsumer _consumerOfResponse;
+
+ void DoFailoverTest(ConnectionInfo info)
+ {
+ DoFailoverTest(new AMQConnection(info));
+ }
+
+ void DoFailoverTest(IConnection connection)
+ {
+ AMQConnection amqConnection = (AMQConnection)connection;
+ amqConnection.ConnectionListener = this;
+ //Console.WriteLine("connection.url = " + amqConnection.ToURL());
+ _connection = connection;
+ _connection.ExceptionListener = new ExceptionListenerDelegate(OnConnectionException);
+ _channel = _connection.CreateChannel(false, AcknowledgeMode.NoAcknowledge);
+
+ string exchangeName = ExchangeNameDefaults.TOPIC;
+ string routingKey = "topic1";
+
+ string queueName = DeclareAndBindTemporaryQueue(exchangeName, routingKey);
+
+ new MsgListener(_connection.CreateChannel(false, AcknowledgeMode.NoAcknowledge), queueName);
+
+ IChannel channel = _channel;
+
+ string tempQueueName = channel.GenerateUniqueName();
+ channel.DeclareQueue(tempQueueName, false, true, true);
+ _consumerOfResponse = channel.CreateConsumerBuilder(tempQueueName).Create();
+ _consumerOfResponse.OnMessage = new MessageReceivedDelegate(OnMessage);
+
+ _connection.Start();
+
+ IMessage msg = _channel.CreateTextMessage("Init");
+ // FIXME: Leaving ReplyToExchangeName as default (i.e. the default exchange)
+ // FIXME: but the implementation might not like this as it defaults to null rather than "".
+ msg.ReplyToRoutingKey = tempQueueName;
+// msg.ReplyTo = new ReplyToDestination("" /* i.e. the default exchange */, tempQueueName);
+ _logger.Info(String.Format("sending msg.Text={0}", ((ITextMessage)msg).Text));
+
+// _publisher = _channel.CreatePublisher(exchangeName, exchangeClass, routingKey);
+ _publisher = _channel.CreatePublisherBuilder()
+ .withRoutingKey(routingKey)
+ .withExchangeName(exchangeName)
+ .Create();
+ _publisher.Send(msg);
+ }
+
+ public string DeclareAndBindTemporaryQueue(string exchangeName, string routingKey)
+ {
+ string queueName = _channel.GenerateUniqueName();
+
+ // Queue.Declare
+ _channel.DeclareQueue(queueName, false, true, true);
+
+ // Queue.Bind
+ _channel.Bind(queueName, exchangeName, routingKey);
+ return queueName;
+ }
+
+ private void OnConnectionException(Exception e)
+ {
+ _logger.Error("Connection exception occurred", e);
+ }
+
+ public void OnMessage(IMessage message)
+ {
+ try
+ {
+ _logger.Info("received message on temp queue msg.Text=" + ((ITextMessage)message).Text);
+ Thread.Sleep(1000);
+ _publisher.Send(_channel.CreateTextMessage("Message" + (++_count)));
+ }
+ catch (QpidException e)
+ {
+ error(e);
+ }
+ }
+
+ private void error(Exception e)
+ {
+ _logger.Error("exception received", e);
+ stop();
+ }
+
+ private void stop()
+ {
+ _logger.Info("Stopping...");
+ try
+ {
+ _connection.Dispose();
+ }
+ catch (QpidException e)
+ {
+ _logger.Error("Failed to shutdown", e);
+ }
+ }
+
+ public void BytesSent(long count)
+ {
+ }
+
+ public void BytesReceived(long count)
+ {
+ }
+
+ public bool PreFailover(bool redirect)
+ {
+ _logger.Info("preFailover(" + redirect + ") called");
+ return true;
+ }
+
+ public bool PreResubscribe()
+ {
+ _logger.Info("preResubscribe() called");
+ return true;
+ }
+
+ public void FailoverComplete()
+ {
+ _logger.Info("failoverComplete() called");
+ }
+
+ private class MsgListener
+ {
+ private IChannel _session;
+ private IMessagePublisher _publisher;
+
+ internal MsgListener(IChannel session, string queueName)
+ {
+ _session = session;
+ _session.CreateConsumerBuilder(queueName).Create().OnMessage =
+ new MessageReceivedDelegate(OnMessage);
+ }
+
+ public void OnMessage(IMessage message)
+ {
+ try
+ {
+ _logger.Info("Received: msg.Text = " + ((ITextMessage) message).Text);
+ if(_publisher == null)
+ {
+ _publisher = init(message);
+ }
+ reply(message);
+ }
+ catch (QpidException e)
+ {
+// Error(e);
+ _logger.Error("yikes", e); // XXX
+ }
+ }
+
+ private void reply(IMessage message)
+ {
+ string msg = ((ITextMessage) message).Text;
+ _logger.Info("sending reply - " + msg);
+ _publisher.Send(_session.CreateTextMessage(msg));
+ }
+
+ private IMessagePublisher init(IMessage message)
+ {
+ _logger.Info(string.Format("creating reply producer with dest = '{0}:{1}'",
+ message.ReplyToExchangeName, message.ReplyToRoutingKey));
+
+ string exchangeName = message.ReplyToExchangeName;
+ string routingKey = message.ReplyToRoutingKey;
+
+ //return _channel.CreatePublisher(exchangeName, exchangeClass, routingKey);
+ return _session.CreatePublisherBuilder()
+ .withExchangeName(exchangeName)
+ .withRoutingKey(routingKey)
+ .Create();
+ }
+ }
+
+ [Test]
+ public void TestWithBasicInfo()
+ {
+ Console.WriteLine("TestWithBasicInfo");
+ try
+ {
+ QpidConnectionInfo connectionInfo = new QpidConnectionInfo();
+ connectionInfo.AddBrokerInfo(new AmqBrokerInfo("amqp", "localhost", 5672, false));
+// connectionInfo.AddBrokerInfo(new AmqBrokerInfo("amqp", "localhost", 5673, false));
+
+ //connectionInfo.AddBrokerInfo(new AmqBrokerInfo("amqp", "eqd-lxamq01", 7672, false));
+
+
+ //connectionInfo.AddBrokerInfo(new AmqBrokerInfo("amqp", "eqd-lxamq01.uk.jpmorgan.com", 8099, false));
+
+
+ DoFailoverTest(connectionInfo);
+ while (true)
+ {
+ Thread.Sleep(5000);
+ }
+ }
+ catch (Exception e)
+ {
+ _logger.Error("Exception caught", e);
+ }
+ }
+
+// [Test]
+// public void TestWithUrl()
+// {
+// String clientId = "failover" + DateTime.Now.Ticks;
+// String defaultUrl = "amqp://guest:guest@" + clientId + "/test" +
+// "?brokerlist='tcp://localhost:5672;tcp://localhost:5673'&failover='roundrobin'";
+//
+// _logger.Info("url = [" + defaultUrl + "]");
+//
+// // _logger.Info("connection url = [" + new AMQConnectionURL(defaultUrl) + "]");
+//
+// String broker = defaultUrl;
+// //new FailoverTest(broker);
+// }
+ }
+}
diff --git a/dotnet/Qpid.Client.Tests/failover/FailoverTxTest.cs b/dotnet/Qpid.Client.Tests/failover/FailoverTxTest.cs
new file mode 100644
index 0000000000..ad1570ed14
--- /dev/null
+++ b/dotnet/Qpid.Client.Tests/failover/FailoverTxTest.cs
@@ -0,0 +1,194 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.Runtime.InteropServices;
+using System.Threading;
+using log4net;
+using NUnit.Framework;
+using Qpid.Client.qms;
+using Qpid.Messaging;
+
+namespace Qpid.Client.Tests.failover
+{
+ [TestFixture]
+ public class FailoverTxTest : IConnectionListener
+ {
+ private static readonly ILog _log = LogManager.GetLogger(typeof(FailoverTxTest));
+
+ const int NUM_ITERATIONS = 3;
+ const int NUM_MESSAGES = 10;
+ const int SLEEP_MILLIS = 500;
+
+ AMQConnection _connection;
+
+ public void onMessage(IMessage message)
+ {
+ try
+ {
+ _log.Info("Received: " + ((ITextMessage) message).Text);
+ }
+ catch (QpidException e)
+ {
+ error(e);
+ }
+ }
+
+ void DoFailoverTxTest(ConnectionInfo connectionInfo)
+ {
+ _connection = new AMQConnection(connectionInfo);
+ _connection.ConnectionListener = this;
+ _log.Info("connection = " + _connection);
+ _log.Info("connectionInfo = " + connectionInfo);
+ _log.Info("connection.asUrl = " + _connection.toURL());
+
+ IChannel session = _connection.CreateChannel(false, AcknowledgeMode.NoAcknowledge);
+
+ string queueName = session.GenerateUniqueName();
+
+ // Queue.Declare
+ session.DeclareQueue(queueName, false, true, true);
+
+ // No need to call Queue.Bind as automatically bound to default direct exchange.
+// channel.Bind(queueName, exchangeName, routingKey);
+
+ session.CreateConsumerBuilder(queueName).Create().OnMessage = new MessageReceivedDelegate(onMessage);
+
+ _connection.Start();
+
+ sendInTx(queueName);
+
+ _connection.Close();
+ _log.Info("FailoverTxText complete");
+ }
+
+ private void sendInTx(string routingKey)
+ {
+ _log.Info("sendInTx");
+ bool transacted = false;
+ IChannel session = _connection.CreateChannel(transacted, AcknowledgeMode.NoAcknowledge);
+ IMessagePublisher publisher = session.CreatePublisherBuilder()
+ .withRoutingKey(routingKey)
+ .Create();
+
+ for (int i = 1; i <= NUM_ITERATIONS; ++i)
+ {
+ for (int j = 1; j <= NUM_MESSAGES; ++j)
+ {
+ ITextMessage msg = session.CreateTextMessage("Tx=" + i + " msg=" + j);
+ _log.Info("sending message = " + msg.Text);
+ publisher.Send(msg);
+ Thread.Sleep(SLEEP_MILLIS);
+ }
+ if (transacted) session.Commit();
+ }
+ }
+
+ private void error(Exception e)
+ {
+ _log.Fatal("Exception received. About to stop.", e);
+ stop();
+ }
+
+ private void stop()
+ {
+ _log.Info("Stopping...");
+ try
+ {
+ _connection.Close();
+ }
+ catch (QpidException e)
+ {
+ _log.Info("Failed to shutdown: ", e);
+ }
+ }
+
+ public void BytesSent(long count)
+ {
+ }
+
+ public void BytesReceived(long count)
+ {
+ }
+
+ public bool PreFailover(bool redirect)
+ {
+ _log.Info("preFailover(" + redirect + ") called");
+ return true;
+ }
+
+ public bool PreResubscribe()
+ {
+ _log.Info("preResubscribe() called");
+ return true;
+ }
+
+ public void FailoverComplete()
+ {
+ _log.Info("failoverComplete() called");
+ }
+
+ [Test]
+ public void TestWithBasicInfo()
+ {
+ Console.WriteLine("TestWithBasicInfo");
+ Console.WriteLine(".NET Framework version: " + RuntimeEnvironment.GetSystemVersion());
+ try
+ {
+ QpidConnectionInfo connectionInfo = new QpidConnectionInfo();
+
+ bool local = true;
+ if (local)
+ {
+ connectionInfo.AddBrokerInfo(new AmqBrokerInfo("amqp", "localhost", 5672, false));
+ connectionInfo.AddBrokerInfo(new AmqBrokerInfo("amqp", "localhost", 5673, false));
+ }
+ else
+ {
+ connectionInfo.AddBrokerInfo(new AmqBrokerInfo("amqp", "eqd-lxamq01.uk.jpmorgan.com", 8099, false));
+ }
+
+ DoFailoverTxTest(connectionInfo);
+ }
+ catch (Exception e)
+ {
+ _log.Error("Exception caught", e);
+ }
+ }
+
+ //[Test]
+ //public void runTestWithUrl()
+ //{
+ // try {
+ // String clientId = "failover" + DateTime.Now.Ticks;
+ // string defaultUrl = "amqp://guest:guest@" + clientId + "/test" +
+ // "?brokerlist='tcp://localhost:5672;tcp://localhost:5673'&failover='roundrobin'";
+
+ // _log.Info("url = [" + defaultUrl + "]");
+
+ // _log.Info("connection url = [" + new AMQConnectionInfo(defaultUrl) + "]");
+
+ // DoFailoverTxTest(new AMQConnectionInfo(defaultUrl));
+ // } catch (Exception e) {
+ // _log.Error("test failed", e);
+ // }
+ //}
+ }
+}