diff options
| author | Steven Shaw <steshaw@apache.org> | 2006-11-25 22:04:39 +0000 |
|---|---|---|
| committer | Steven Shaw <steshaw@apache.org> | 2006-11-25 22:04:39 +0000 |
| commit | 7c1f9158be7a5d1124a48f42f8d7dcfb6d5df2a6 (patch) | |
| tree | 3122525268281cd9df870e0a9cb309ee7410a424 /dotnet/Qpid.Client.Tests/failover | |
| parent | 8f32ca18d5281eaa5baafa769c99fa70c830b14f (diff) | |
| download | qpid-python-7c1f9158be7a5d1124a48f42f8d7dcfb6d5df2a6.tar.gz | |
QPID-128 Initial import of the C# sources.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@479211 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'dotnet/Qpid.Client.Tests/failover')
| -rw-r--r-- | dotnet/Qpid.Client.Tests/failover/FailoverTest.cs | 258 | ||||
| -rw-r--r-- | dotnet/Qpid.Client.Tests/failover/FailoverTxTest.cs | 194 |
2 files changed, 452 insertions, 0 deletions
diff --git a/dotnet/Qpid.Client.Tests/failover/FailoverTest.cs b/dotnet/Qpid.Client.Tests/failover/FailoverTest.cs new file mode 100644 index 0000000000..6b3ab068f5 --- /dev/null +++ b/dotnet/Qpid.Client.Tests/failover/FailoverTest.cs @@ -0,0 +1,258 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using System.Threading; +using log4net; +using NUnit.Framework; +using Qpid.Client.qms; +using Qpid.Messaging; + +namespace Qpid.Client.Tests.failover +{ + [TestFixture] + public class FailoverTest : IConnectionListener + { + private static readonly ILog _logger = LogManager.GetLogger(typeof(FailoverTest)); + + private IConnection _connection; + private IChannel _channel; + private IMessagePublisher _publisher; + private int _count; + + private IMessageConsumer _consumerOfResponse; + + void DoFailoverTest(ConnectionInfo info) + { + DoFailoverTest(new AMQConnection(info)); + } + + void DoFailoverTest(IConnection connection) + { + AMQConnection amqConnection = (AMQConnection)connection; + amqConnection.ConnectionListener = this; + //Console.WriteLine("connection.url = " + amqConnection.ToURL()); + _connection = connection; + _connection.ExceptionListener = new ExceptionListenerDelegate(OnConnectionException); + _channel = _connection.CreateChannel(false, AcknowledgeMode.NoAcknowledge); + + string exchangeName = ExchangeNameDefaults.TOPIC; + string routingKey = "topic1"; + + string queueName = DeclareAndBindTemporaryQueue(exchangeName, routingKey); + + new MsgListener(_connection.CreateChannel(false, AcknowledgeMode.NoAcknowledge), queueName); + + IChannel channel = _channel; + + string tempQueueName = channel.GenerateUniqueName(); + channel.DeclareQueue(tempQueueName, false, true, true); + _consumerOfResponse = channel.CreateConsumerBuilder(tempQueueName).Create(); + _consumerOfResponse.OnMessage = new MessageReceivedDelegate(OnMessage); + + _connection.Start(); + + IMessage msg = _channel.CreateTextMessage("Init"); + // FIXME: Leaving ReplyToExchangeName as default (i.e. the default exchange) + // FIXME: but the implementation might not like this as it defaults to null rather than "". + msg.ReplyToRoutingKey = tempQueueName; +// msg.ReplyTo = new ReplyToDestination("" /* i.e. the default exchange */, tempQueueName); + _logger.Info(String.Format("sending msg.Text={0}", ((ITextMessage)msg).Text)); + +// _publisher = _channel.CreatePublisher(exchangeName, exchangeClass, routingKey); + _publisher = _channel.CreatePublisherBuilder() + .withRoutingKey(routingKey) + .withExchangeName(exchangeName) + .Create(); + _publisher.Send(msg); + } + + public string DeclareAndBindTemporaryQueue(string exchangeName, string routingKey) + { + string queueName = _channel.GenerateUniqueName(); + + // Queue.Declare + _channel.DeclareQueue(queueName, false, true, true); + + // Queue.Bind + _channel.Bind(queueName, exchangeName, routingKey); + return queueName; + } + + private void OnConnectionException(Exception e) + { + _logger.Error("Connection exception occurred", e); + } + + public void OnMessage(IMessage message) + { + try + { + _logger.Info("received message on temp queue msg.Text=" + ((ITextMessage)message).Text); + Thread.Sleep(1000); + _publisher.Send(_channel.CreateTextMessage("Message" + (++_count))); + } + catch (QpidException e) + { + error(e); + } + } + + private void error(Exception e) + { + _logger.Error("exception received", e); + stop(); + } + + private void stop() + { + _logger.Info("Stopping..."); + try + { + _connection.Dispose(); + } + catch (QpidException e) + { + _logger.Error("Failed to shutdown", e); + } + } + + public void BytesSent(long count) + { + } + + public void BytesReceived(long count) + { + } + + public bool PreFailover(bool redirect) + { + _logger.Info("preFailover(" + redirect + ") called"); + return true; + } + + public bool PreResubscribe() + { + _logger.Info("preResubscribe() called"); + return true; + } + + public void FailoverComplete() + { + _logger.Info("failoverComplete() called"); + } + + private class MsgListener + { + private IChannel _session; + private IMessagePublisher _publisher; + + internal MsgListener(IChannel session, string queueName) + { + _session = session; + _session.CreateConsumerBuilder(queueName).Create().OnMessage = + new MessageReceivedDelegate(OnMessage); + } + + public void OnMessage(IMessage message) + { + try + { + _logger.Info("Received: msg.Text = " + ((ITextMessage) message).Text); + if(_publisher == null) + { + _publisher = init(message); + } + reply(message); + } + catch (QpidException e) + { +// Error(e); + _logger.Error("yikes", e); // XXX + } + } + + private void reply(IMessage message) + { + string msg = ((ITextMessage) message).Text; + _logger.Info("sending reply - " + msg); + _publisher.Send(_session.CreateTextMessage(msg)); + } + + private IMessagePublisher init(IMessage message) + { + _logger.Info(string.Format("creating reply producer with dest = '{0}:{1}'", + message.ReplyToExchangeName, message.ReplyToRoutingKey)); + + string exchangeName = message.ReplyToExchangeName; + string routingKey = message.ReplyToRoutingKey; + + //return _channel.CreatePublisher(exchangeName, exchangeClass, routingKey); + return _session.CreatePublisherBuilder() + .withExchangeName(exchangeName) + .withRoutingKey(routingKey) + .Create(); + } + } + + [Test] + public void TestWithBasicInfo() + { + Console.WriteLine("TestWithBasicInfo"); + try + { + QpidConnectionInfo connectionInfo = new QpidConnectionInfo(); + connectionInfo.AddBrokerInfo(new AmqBrokerInfo("amqp", "localhost", 5672, false)); +// connectionInfo.AddBrokerInfo(new AmqBrokerInfo("amqp", "localhost", 5673, false)); + + //connectionInfo.AddBrokerInfo(new AmqBrokerInfo("amqp", "eqd-lxamq01", 7672, false)); + + + //connectionInfo.AddBrokerInfo(new AmqBrokerInfo("amqp", "eqd-lxamq01.uk.jpmorgan.com", 8099, false)); + + + DoFailoverTest(connectionInfo); + while (true) + { + Thread.Sleep(5000); + } + } + catch (Exception e) + { + _logger.Error("Exception caught", e); + } + } + +// [Test] +// public void TestWithUrl() +// { +// String clientId = "failover" + DateTime.Now.Ticks; +// String defaultUrl = "amqp://guest:guest@" + clientId + "/test" + +// "?brokerlist='tcp://localhost:5672;tcp://localhost:5673'&failover='roundrobin'"; +// +// _logger.Info("url = [" + defaultUrl + "]"); +// +// // _logger.Info("connection url = [" + new AMQConnectionURL(defaultUrl) + "]"); +// +// String broker = defaultUrl; +// //new FailoverTest(broker); +// } + } +} diff --git a/dotnet/Qpid.Client.Tests/failover/FailoverTxTest.cs b/dotnet/Qpid.Client.Tests/failover/FailoverTxTest.cs new file mode 100644 index 0000000000..ad1570ed14 --- /dev/null +++ b/dotnet/Qpid.Client.Tests/failover/FailoverTxTest.cs @@ -0,0 +1,194 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using System.Runtime.InteropServices; +using System.Threading; +using log4net; +using NUnit.Framework; +using Qpid.Client.qms; +using Qpid.Messaging; + +namespace Qpid.Client.Tests.failover +{ + [TestFixture] + public class FailoverTxTest : IConnectionListener + { + private static readonly ILog _log = LogManager.GetLogger(typeof(FailoverTxTest)); + + const int NUM_ITERATIONS = 3; + const int NUM_MESSAGES = 10; + const int SLEEP_MILLIS = 500; + + AMQConnection _connection; + + public void onMessage(IMessage message) + { + try + { + _log.Info("Received: " + ((ITextMessage) message).Text); + } + catch (QpidException e) + { + error(e); + } + } + + void DoFailoverTxTest(ConnectionInfo connectionInfo) + { + _connection = new AMQConnection(connectionInfo); + _connection.ConnectionListener = this; + _log.Info("connection = " + _connection); + _log.Info("connectionInfo = " + connectionInfo); + _log.Info("connection.asUrl = " + _connection.toURL()); + + IChannel session = _connection.CreateChannel(false, AcknowledgeMode.NoAcknowledge); + + string queueName = session.GenerateUniqueName(); + + // Queue.Declare + session.DeclareQueue(queueName, false, true, true); + + // No need to call Queue.Bind as automatically bound to default direct exchange. +// channel.Bind(queueName, exchangeName, routingKey); + + session.CreateConsumerBuilder(queueName).Create().OnMessage = new MessageReceivedDelegate(onMessage); + + _connection.Start(); + + sendInTx(queueName); + + _connection.Close(); + _log.Info("FailoverTxText complete"); + } + + private void sendInTx(string routingKey) + { + _log.Info("sendInTx"); + bool transacted = false; + IChannel session = _connection.CreateChannel(transacted, AcknowledgeMode.NoAcknowledge); + IMessagePublisher publisher = session.CreatePublisherBuilder() + .withRoutingKey(routingKey) + .Create(); + + for (int i = 1; i <= NUM_ITERATIONS; ++i) + { + for (int j = 1; j <= NUM_MESSAGES; ++j) + { + ITextMessage msg = session.CreateTextMessage("Tx=" + i + " msg=" + j); + _log.Info("sending message = " + msg.Text); + publisher.Send(msg); + Thread.Sleep(SLEEP_MILLIS); + } + if (transacted) session.Commit(); + } + } + + private void error(Exception e) + { + _log.Fatal("Exception received. About to stop.", e); + stop(); + } + + private void stop() + { + _log.Info("Stopping..."); + try + { + _connection.Close(); + } + catch (QpidException e) + { + _log.Info("Failed to shutdown: ", e); + } + } + + public void BytesSent(long count) + { + } + + public void BytesReceived(long count) + { + } + + public bool PreFailover(bool redirect) + { + _log.Info("preFailover(" + redirect + ") called"); + return true; + } + + public bool PreResubscribe() + { + _log.Info("preResubscribe() called"); + return true; + } + + public void FailoverComplete() + { + _log.Info("failoverComplete() called"); + } + + [Test] + public void TestWithBasicInfo() + { + Console.WriteLine("TestWithBasicInfo"); + Console.WriteLine(".NET Framework version: " + RuntimeEnvironment.GetSystemVersion()); + try + { + QpidConnectionInfo connectionInfo = new QpidConnectionInfo(); + + bool local = true; + if (local) + { + connectionInfo.AddBrokerInfo(new AmqBrokerInfo("amqp", "localhost", 5672, false)); + connectionInfo.AddBrokerInfo(new AmqBrokerInfo("amqp", "localhost", 5673, false)); + } + else + { + connectionInfo.AddBrokerInfo(new AmqBrokerInfo("amqp", "eqd-lxamq01.uk.jpmorgan.com", 8099, false)); + } + + DoFailoverTxTest(connectionInfo); + } + catch (Exception e) + { + _log.Error("Exception caught", e); + } + } + + //[Test] + //public void runTestWithUrl() + //{ + // try { + // String clientId = "failover" + DateTime.Now.Ticks; + // string defaultUrl = "amqp://guest:guest@" + clientId + "/test" + + // "?brokerlist='tcp://localhost:5672;tcp://localhost:5673'&failover='roundrobin'"; + + // _log.Info("url = [" + defaultUrl + "]"); + + // _log.Info("connection url = [" + new AMQConnectionInfo(defaultUrl) + "]"); + + // DoFailoverTxTest(new AMQConnectionInfo(defaultUrl)); + // } catch (Exception e) { + // _log.Error("test failed", e); + // } + //} + } +} |
