diff options
Diffstat (limited to 'dotnet/Qpid.Integration.Tests/testcases')
10 files changed, 1769 insertions, 1769 deletions
diff --git a/dotnet/Qpid.Integration.Tests/testcases/BaseMessagingTestFixture.cs b/dotnet/Qpid.Integration.Tests/testcases/BaseMessagingTestFixture.cs index d4b61a2788..4c82dbe08c 100644 --- a/dotnet/Qpid.Integration.Tests/testcases/BaseMessagingTestFixture.cs +++ b/dotnet/Qpid.Integration.Tests/testcases/BaseMessagingTestFixture.cs @@ -1,261 +1,261 @@ -/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-using System;
-using System.Text;
-using log4net;
-using NUnit.Framework;
-using Apache.Qpid.Messaging;
-using Apache.Qpid.Client.Qms;
-using Apache.Qpid.Client;
-
-namespace Apache.Qpid.Integration.Tests.testcases
-{
- /// <summary>
- /// Provides a basis for writing Unit tests that communicate with an AMQ protocol broker. By default it creates a connection
- /// to a message broker running on localhost on the standard AMQ port, 5672, using guest:guest login credentials. It also
- /// creates a standard auto-ack channel on this connection.
- /// </summary>
- public class BaseMessagingTestFixture
- {
- private static ILog log = LogManager.GetLogger(typeof(BaseMessagingTestFixture));
-
- /// <summary> Used to build dummy data to fill test messages with. </summary>
- private const string MESSAGE_DATA_BYTES = "-- Test Message -- Test Message -- Test Message -- Test Message -- Test Message ";
-
- /// <summary> The default timeout in milliseconds to use on receives. </summary>
- private const long RECEIVE_WAIT = 2000;
-
- /// <summary> The default AMQ connection URL to use for tests. </summary>
- public const string connectionUri = "amqp://guest:guest@test/test?brokerlist='tcp://localhost:5672'";
-
- /// <summary> The default AMQ connection URL parsed as a connection info. </summary>
- protected IConnectionInfo connectionInfo;
-
- /// <summary> Holds an array of connections for building mutiple test end-points. </summary>
- protected IConnection[] testConnection = new IConnection[10];
-
- /// <summary> Holds an array of channels for building mutiple test end-points. </summary>
- protected IChannel[] testChannel = new IChannel[10];
-
- /// <summary> Holds an array of queues for building mutiple test end-points. </summary>
- protected String[] testQueue = new String[10];
-
- /// <summary> Holds an array of producers for building mutiple test end-points. </summary>
- protected IMessagePublisher[] testProducer = new IMessagePublisher[10];
-
- /// <summary> Holds an array of consumers for building mutiple test end-points. </summary>
- protected IMessageConsumer[] testConsumer = new IMessageConsumer[10];
-
- /// <summary> A counter used to supply unique ids. </summary>
- private static int uniqueId = 0;
-
- /// <summary> Used to hold unique ids per test. </summary>
- protected Guid testId;
-
- /// <summary> Creates the test connection and channel. </summary>
- [SetUp]
- public virtual void Init()
- {
- log.Debug("public virtual void Init(): called");
-
- // Set up a unique id for this test.
- testId = System.Guid.NewGuid();
- }
-
- /// <summary>
- /// Disposes of the test connection. This is called manually because the connection is a field so dispose will not be automatically
- /// called on it.
- /// </summary>
- [TearDown]
- public virtual void Shutdown()
- {
- log.Debug("public virtual void Shutdown(): called");
- }
-
- /// <summary> Sets up the nth test end-point. </summary>
- ///
- /// <param name="n">The index of the test end-point to set up.</param>
- /// <param name="producer"><tt>true</tt> to set up a producer on the end-point.</param>
- /// <param name="consumer"><tt>true</tt> to set up a consumer on the end-point.</param>
- /// <param name="routingKey">The routing key for the producer to send on.</param>
- /// <param name="ackMode">The ack mode for the end-points channel.</param>
- /// <param name="transacted"><tt>true</tt> to use transactions on the end-points channel.</param>
- /// <param name="exchangeName">The exchange to produce or consume on.</param>
- /// <param name="declareBind"><tt>true</tt> if the consumers queue should be declared and bound, <tt>false</tt> if it has already been.</param>
- /// <param name="durable"><tt>true</tt> to declare the consumers queue as durable.</param>
- /// <param name="subscriptionName">If durable is true, the fixed unique queue name to use.</param>
- public void SetUpEndPoint(int n, bool producer, bool consumer, string routingKey, AcknowledgeMode ackMode, bool transacted,
- string exchangeName, bool declareBind, bool durable, string subscriptionName)
- {
- // Allow client id to be fixed, or undefined.
- {
- // Use unique id for end point.
- connectionInfo = QpidConnectionInfo.FromUrl(connectionUri);
-
- connectionInfo.ClientName = "test" + n;
- }
-
- testConnection[n] = new AMQConnection(connectionInfo);
- testConnection[n].Start();
- testChannel[n] = testConnection[n].CreateChannel(transacted, ackMode);
-
- if (producer)
- {
- testProducer[n] = testChannel[n].CreatePublisherBuilder()
- .WithExchangeName(exchangeName)
- .WithRoutingKey(routingKey)
- .Create();
- }
-
- if (consumer)
- {
- string queueName;
-
- // Use the subscription name as the queue name if the subscription is durable, otherwise use a generated name.
- if (durable)
- {
- // The durable queue is declared without auto-delete, and passively, in case it has already been declared.
- queueName = subscriptionName;
-
- if (declareBind)
- {
- testChannel[n].DeclareQueue(queueName, durable, true, false);
- testChannel[n].Bind(queueName, exchangeName, routingKey);
- }
- }
- else
- {
- queueName = testChannel[n].GenerateUniqueName();
-
- if (declareBind)
- {
- if (durable)
- {
- testQueue[n] = queueName;
- }
- testChannel[n].DeclareQueue(queueName, durable, true, true);
- testChannel[n].Bind(queueName, exchangeName, routingKey);
- }
- }
-
- testConsumer[n] = testChannel[n].CreateConsumerBuilder(queueName).Create();
- }
- }
-
- /// <summary> Closes down the nth test end-point. </summary>
- public void CloseEndPoint(int n)
- {
- log.Debug("public void CloseEndPoint(int n): called");
-
- if (testProducer[n] != null)
- {
- testProducer[n].Close();
- testProducer[n].Dispose();
- testProducer[n] = null;
- }
-
- if (testConsumer[n] != null)
- {
- if (testQueue[n] != null)
- {
- testChannel[n].DeleteQueue(testQueue[n], false, false, true);
- }
- testConsumer[n].Close();
- testConsumer[n].Dispose();
- testConsumer[n] = null;
- }
-
- if (testConnection[n] != null)
- {
- testConnection[n].Stop();
- testConnection[n].Close();
- testConnection[n].Dispose();
- testConnection[n] = null;
- }
- }
-
- /// <summary>
- /// Consumes n messages, checking that the n+1th is not available within a timeout, and that the consumed messages
- /// are text messages with contents equal to the specified message body.
- /// </summary>
- ///
- /// <param name="n">The number of messages to consume.</param>
- /// <param name="body">The body text to match against all messages.</param>
- /// <param name="consumer">The message consumer to recieve the messages on.</param>
- public static void ConsumeNMessagesOnly(int n, string body, IMessageConsumer consumer)
- {
- ConsumeNMessages(n, body, consumer);
-
- // Check that one more than n cannot be received.
- IMessage msg = consumer.Receive(RECEIVE_WAIT);
- Assert.IsNull(msg, "Consumer got more messages than the number requested (" + n + ").");
- }
-
- /// <summary>
- /// Consumes n messages, checking that the n+1th is not available within a timeout, and that the consumed messages
- /// are text messages with contents equal to the specified message body.
- /// </summary>
- ///
- /// <param name="n">The number of messages to consume.</param>
- /// <param name="body">The body text to match against all messages.</param>
- /// <param name="consumer">The message consumer to recieve the messages on.</param>
- public static void ConsumeNMessages(int n, string body, IMessageConsumer consumer)
- {
- IMessage msg;
-
- // Try to receive n messages.
- for (int i = 0; i < n; i++)
- {
- msg = consumer.Receive(RECEIVE_WAIT);
- Assert.IsNotNull(msg, "Consumer did not receive message number: " + i);
- Assert.AreEqual(body, ((ITextMessage)msg).Text, "Incorrect Message recevied on consumer1.");
- }
- }
-
- /// <summary>Creates the requested number of bytes of dummy text. Usually used for filling test messages. </summary>
- ///
- /// <param name="size">The number of bytes of dummy text to generate.</param>
- ///
- /// <return>The requested number of bytes of dummy text.</return>
- public static String GetData(int size)
- {
- StringBuilder buf = new StringBuilder(size);
-
- if (size > 0)
- {
- int div = MESSAGE_DATA_BYTES.Length / size;
- int mod = MESSAGE_DATA_BYTES.Length % size;
-
- for (int i = 0; i < div; i++)
- {
- buf.Append(MESSAGE_DATA_BYTES);
- }
-
- if (mod != 0)
- {
- buf.Append(MESSAGE_DATA_BYTES, 0, mod);
- }
- }
-
- return buf.ToString();
- }
- }
-}
+/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using System.Text; +using log4net; +using NUnit.Framework; +using Apache.Qpid.Messaging; +using Apache.Qpid.Client.Qms; +using Apache.Qpid.Client; + +namespace Apache.Qpid.Integration.Tests.testcases +{ + /// <summary> + /// Provides a basis for writing Unit tests that communicate with an AMQ protocol broker. By default it creates a connection + /// to a message broker running on localhost on the standard AMQ port, 5672, using guest:guest login credentials. It also + /// creates a standard auto-ack channel on this connection. + /// </summary> + public class BaseMessagingTestFixture + { + private static ILog log = LogManager.GetLogger(typeof(BaseMessagingTestFixture)); + + /// <summary> Used to build dummy data to fill test messages with. </summary> + private const string MESSAGE_DATA_BYTES = "-- Test Message -- Test Message -- Test Message -- Test Message -- Test Message "; + + /// <summary> The default timeout in milliseconds to use on receives. </summary> + private const long RECEIVE_WAIT = 2000; + + /// <summary> The default AMQ connection URL to use for tests. </summary> + public const string connectionUri = "amqp://guest:guest@test/test?brokerlist='tcp://localhost:5672'"; + + /// <summary> The default AMQ connection URL parsed as a connection info. </summary> + protected IConnectionInfo connectionInfo; + + /// <summary> Holds an array of connections for building mutiple test end-points. </summary> + protected IConnection[] testConnection = new IConnection[10]; + + /// <summary> Holds an array of channels for building mutiple test end-points. </summary> + protected IChannel[] testChannel = new IChannel[10]; + + /// <summary> Holds an array of queues for building mutiple test end-points. </summary> + protected String[] testQueue = new String[10]; + + /// <summary> Holds an array of producers for building mutiple test end-points. </summary> + protected IMessagePublisher[] testProducer = new IMessagePublisher[10]; + + /// <summary> Holds an array of consumers for building mutiple test end-points. </summary> + protected IMessageConsumer[] testConsumer = new IMessageConsumer[10]; + + /// <summary> A counter used to supply unique ids. </summary> + private static int uniqueId = 0; + + /// <summary> Used to hold unique ids per test. </summary> + protected Guid testId; + + /// <summary> Creates the test connection and channel. </summary> + [SetUp] + public virtual void Init() + { + log.Debug("public virtual void Init(): called"); + + // Set up a unique id for this test. + testId = System.Guid.NewGuid(); + } + + /// <summary> + /// Disposes of the test connection. This is called manually because the connection is a field so dispose will not be automatically + /// called on it. + /// </summary> + [TearDown] + public virtual void Shutdown() + { + log.Debug("public virtual void Shutdown(): called"); + } + + /// <summary> Sets up the nth test end-point. </summary> + /// + /// <param name="n">The index of the test end-point to set up.</param> + /// <param name="producer"><tt>true</tt> to set up a producer on the end-point.</param> + /// <param name="consumer"><tt>true</tt> to set up a consumer on the end-point.</param> + /// <param name="routingKey">The routing key for the producer to send on.</param> + /// <param name="ackMode">The ack mode for the end-points channel.</param> + /// <param name="transacted"><tt>true</tt> to use transactions on the end-points channel.</param> + /// <param name="exchangeName">The exchange to produce or consume on.</param> + /// <param name="declareBind"><tt>true</tt> if the consumers queue should be declared and bound, <tt>false</tt> if it has already been.</param> + /// <param name="durable"><tt>true</tt> to declare the consumers queue as durable.</param> + /// <param name="subscriptionName">If durable is true, the fixed unique queue name to use.</param> + public void SetUpEndPoint(int n, bool producer, bool consumer, string routingKey, AcknowledgeMode ackMode, bool transacted, + string exchangeName, bool declareBind, bool durable, string subscriptionName) + { + // Allow client id to be fixed, or undefined. + { + // Use unique id for end point. + connectionInfo = QpidConnectionInfo.FromUrl(connectionUri); + + connectionInfo.ClientName = "test" + n; + } + + testConnection[n] = new AMQConnection(connectionInfo); + testConnection[n].Start(); + testChannel[n] = testConnection[n].CreateChannel(transacted, ackMode); + + if (producer) + { + testProducer[n] = testChannel[n].CreatePublisherBuilder() + .WithExchangeName(exchangeName) + .WithRoutingKey(routingKey) + .Create(); + } + + if (consumer) + { + string queueName; + + // Use the subscription name as the queue name if the subscription is durable, otherwise use a generated name. + if (durable) + { + // The durable queue is declared without auto-delete, and passively, in case it has already been declared. + queueName = subscriptionName; + + if (declareBind) + { + testChannel[n].DeclareQueue(queueName, durable, true, false); + testChannel[n].Bind(queueName, exchangeName, routingKey); + } + } + else + { + queueName = testChannel[n].GenerateUniqueName(); + + if (declareBind) + { + if (durable) + { + testQueue[n] = queueName; + } + testChannel[n].DeclareQueue(queueName, durable, true, true); + testChannel[n].Bind(queueName, exchangeName, routingKey); + } + } + + testConsumer[n] = testChannel[n].CreateConsumerBuilder(queueName).Create(); + } + } + + /// <summary> Closes down the nth test end-point. </summary> + public void CloseEndPoint(int n) + { + log.Debug("public void CloseEndPoint(int n): called"); + + if (testProducer[n] != null) + { + testProducer[n].Close(); + testProducer[n].Dispose(); + testProducer[n] = null; + } + + if (testConsumer[n] != null) + { + if (testQueue[n] != null) + { + testChannel[n].DeleteQueue(testQueue[n], false, false, true); + } + testConsumer[n].Close(); + testConsumer[n].Dispose(); + testConsumer[n] = null; + } + + if (testConnection[n] != null) + { + testConnection[n].Stop(); + testConnection[n].Close(); + testConnection[n].Dispose(); + testConnection[n] = null; + } + } + + /// <summary> + /// Consumes n messages, checking that the n+1th is not available within a timeout, and that the consumed messages + /// are text messages with contents equal to the specified message body. + /// </summary> + /// + /// <param name="n">The number of messages to consume.</param> + /// <param name="body">The body text to match against all messages.</param> + /// <param name="consumer">The message consumer to recieve the messages on.</param> + public static void ConsumeNMessagesOnly(int n, string body, IMessageConsumer consumer) + { + ConsumeNMessages(n, body, consumer); + + // Check that one more than n cannot be received. + IMessage msg = consumer.Receive(RECEIVE_WAIT); + Assert.IsNull(msg, "Consumer got more messages than the number requested (" + n + ")."); + } + + /// <summary> + /// Consumes n messages, checking that the n+1th is not available within a timeout, and that the consumed messages + /// are text messages with contents equal to the specified message body. + /// </summary> + /// + /// <param name="n">The number of messages to consume.</param> + /// <param name="body">The body text to match against all messages.</param> + /// <param name="consumer">The message consumer to recieve the messages on.</param> + public static void ConsumeNMessages(int n, string body, IMessageConsumer consumer) + { + IMessage msg; + + // Try to receive n messages. + for (int i = 0; i < n; i++) + { + msg = consumer.Receive(RECEIVE_WAIT); + Assert.IsNotNull(msg, "Consumer did not receive message number: " + i); + Assert.AreEqual(body, ((ITextMessage)msg).Text, "Incorrect Message recevied on consumer1."); + } + } + + /// <summary>Creates the requested number of bytes of dummy text. Usually used for filling test messages. </summary> + /// + /// <param name="size">The number of bytes of dummy text to generate.</param> + /// + /// <return>The requested number of bytes of dummy text.</return> + public static String GetData(int size) + { + StringBuilder buf = new StringBuilder(size); + + if (size > 0) + { + int div = MESSAGE_DATA_BYTES.Length / size; + int mod = MESSAGE_DATA_BYTES.Length % size; + + for (int i = 0; i < div; i++) + { + buf.Append(MESSAGE_DATA_BYTES); + } + + if (mod != 0) + { + buf.Append(MESSAGE_DATA_BYTES, 0, mod); + } + } + + return buf.ToString(); + } + } +} diff --git a/dotnet/Qpid.Integration.Tests/testcases/ChannelQueueTest.cs b/dotnet/Qpid.Integration.Tests/testcases/ChannelQueueTest.cs index e34864aefd..4692e7ecb1 100644 --- a/dotnet/Qpid.Integration.Tests/testcases/ChannelQueueTest.cs +++ b/dotnet/Qpid.Integration.Tests/testcases/ChannelQueueTest.cs @@ -1,237 +1,237 @@ -/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-using System;
-using System.Net;
-using System.Threading;
-using log4net;
-using Apache.Qpid.Client.Qms;
-using Apache.Qpid.Client;
-using Apache.Qpid.Messaging;
-using NUnit.Framework;
-
-namespace Apache.Qpid.Integration.Tests.testcases
-{
- /// <summary>
- /// Test the queue methods
- /// </summary>
- [TestFixture, Category("Integration")]
- public class ChannelQueueTest
- {
- private static ILog _logger = LogManager.GetLogger(typeof(ChannelQueueTest));
-
- /// <summary> The default AMQ connection URL to use for tests. </summary>
- const string DEFAULT_URI = "amqp://guest:guest@default/test?brokerlist='tcp://localhost:5672'";
- const string _routingKey = "ServiceQ1";
-
- private ExceptionListenerDelegate _exceptionDelegate;
- private AutoResetEvent _evt = new AutoResetEvent(false);
- private Exception _lastException = null;
-
- private IMessageConsumer _consumer;
- private IMessagePublisher _publisher;
- private IChannel _channel;
- private IConnection _connection;
-
- private string _queueName;
-
- [SetUp]
- public virtual void Init()
- {
- _logger.Info("public virtual void Init(): called");
-
- // Create a connection to the broker.
- IConnectionInfo connectionInfo = QpidConnectionInfo.FromUrl(DEFAULT_URI);
- _connection = new AMQConnection(connectionInfo);
- _logger.Info("Starting...");
-
- // Register this to listen for exceptions on the test connection.
- _exceptionDelegate = new ExceptionListenerDelegate(OnException);
- _connection.ExceptionListener += _exceptionDelegate;
-
- // Establish a session on the broker.
- _channel = _connection.CreateChannel(false, AcknowledgeMode.AutoAcknowledge, 1);
-
- // Create a durable, non-temporary, non-exclusive queue.
- _queueName = _channel.GenerateUniqueName();
- _channel.DeclareQueue(_queueName, true, false, false);
-
- _channel.Bind(_queueName, ExchangeNameDefaults.TOPIC, _routingKey);
-
- // Clear the most recent message and exception.
- _lastException = null;
- }
-
- [TearDown]
- public virtual void ShutDown()
- {
- _logger.Info("public virtual void Shutdown(): called");
-
- if (_connection != null)
- {
- _logger.Info("Disposing connection.");
- _connection.Dispose();
- _logger.Info("Connection disposed.");
- }
- }
-
- [Test]
- public void DeleteUsedQueue()
- {
- // Create the consumer
- _consumer = _channel.CreateConsumerBuilder(_queueName)
- .WithPrefetchLow(100)
- .Create();
- _logger.Info("Consumer was created...");
-
- // delete the queue
- _channel.DeleteQueue(_queueName, false, true, true);
- _logger.InfoFormat("Queue {0} was delete", _queueName);
-
- Assert.IsNull(_lastException);
- }
-
- [Test]
- public void DeleteUnusedQueue()
- {
- // delete the queue
- _channel.DeleteQueue(_queueName, true, true, true);
- _logger.InfoFormat("Queue {0} was delete", _queueName);
-
- Assert.IsNull(_lastException);
- }
-
- [Test]
- public void DeleteNonEmptyQueue()
- {
- // Create the publisher
- _publisher = _channel.CreatePublisherBuilder()
- .WithExchangeName(ExchangeNameDefaults.TOPIC)
- .WithRoutingKey(_routingKey)
- .Create();
- _logger.Info("Publisher created...");
- SendTestMessage("DeleteNonEmptyQueue Message 1");
-
- try
- {
- _channel.DeleteQueue(_queueName, true, false, true);
- }
- catch (AMQException)
- {
- Assert.Fail("The test fails");
- }
- }
-
- [Test]
- public void DeleteEmptyQueue()
- {
- // Create the publisher
- _publisher = _channel.CreatePublisherBuilder()
- .WithExchangeName(ExchangeNameDefaults.TOPIC)
- .WithRoutingKey(_routingKey)
- .Create();
- _logger.Info("Publisher created...");
-
- // delete an empty queue with ifEmpty = true
- _channel.DeleteQueue(_queueName, false, true, true);
-
- Assert.IsNull(_lastException);
- }
-
- [Test]
- public void DeleteQueueWithResponse()
- {
- // Create the publisher
- _publisher = _channel.CreatePublisherBuilder()
- .WithExchangeName(ExchangeNameDefaults.TOPIC)
- .WithRoutingKey(_routingKey)
- .Create();
- _logger.Info("Publisher created...");
-
- SendTestMessage("DeleteQueueWithResponse Message 1");
- SendTestMessage("DeleteQueueWithResponse Message 2");
-
- // delete the queue, the server must respond
- _channel.DeleteQueue(_queueName, false, false, false);
- }
-
- [Test]
- public void PurgeQueueWithResponse()
- {
- _publisher = _channel.CreatePublisherBuilder()
- .WithExchangeName(ExchangeNameDefaults.TOPIC)
- .WithRoutingKey(_routingKey)
- .Create();
- _logger.Info("Pubisher created");
-
- SendTestMessage("Message 1");
- SendTestMessage("Message 2");
-
- _channel.PurgeQueue(_queueName, false);
- }
-
- [Test]
- public void PurgeQueueWithOutResponse()
- {
- _publisher = _channel.CreatePublisherBuilder()
- .WithExchangeName(ExchangeNameDefaults.TOPIC)
- .WithRoutingKey(_routingKey)
- .Create();
- _logger.Info("Pubisher created");
-
- SendTestMessage("Message 1");
- SendTestMessage("Message 2");
-
- _channel.PurgeQueue(_queueName, true);
- }
-
-
- /// <summary>
- /// Callback method to handle any exceptions raised by the test connection.</summary> ///
- /// <param name="e">The connection exception.</param>
- public void OnException(Exception e)
- {
- // Preserve the most recent exception in case test cases need to examine it.
- _lastException = e;
-
- // Notify any waiting threads that an exception event has occurred.
- _evt.Set();
- }
-
- /// <summary>
- /// Sends the specified message to the test publisher, and confirms that it was received by the test consumer or not
- /// depending on whether or not the message should be received by the consumer.
- ///
- /// Any exceptions raised by the connection will cause an Assert failure exception to be raised.
- /// </summary>
- ///
- /// <param name="msgSend">The message to send.</param>
- private void SendTestMessage(string msg)
- {
- // create the IMessage object
- IMessage msgSend = _channel.CreateTextMessage(msg);
-
- // send the message
- _publisher.Send(msgSend);
- _logger.InfoFormat("The messages \"{0}\" was sent", msg);
- }
-
- }
-}
+/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using System.Net; +using System.Threading; +using log4net; +using Apache.Qpid.Client.Qms; +using Apache.Qpid.Client; +using Apache.Qpid.Messaging; +using NUnit.Framework; + +namespace Apache.Qpid.Integration.Tests.testcases +{ + /// <summary> + /// Test the queue methods + /// </summary> + [TestFixture, Category("Integration")] + public class ChannelQueueTest + { + private static ILog _logger = LogManager.GetLogger(typeof(ChannelQueueTest)); + + /// <summary> The default AMQ connection URL to use for tests. </summary> + const string DEFAULT_URI = "amqp://guest:guest@default/test?brokerlist='tcp://localhost:5672'"; + const string _routingKey = "ServiceQ1"; + + private ExceptionListenerDelegate _exceptionDelegate; + private AutoResetEvent _evt = new AutoResetEvent(false); + private Exception _lastException = null; + + private IMessageConsumer _consumer; + private IMessagePublisher _publisher; + private IChannel _channel; + private IConnection _connection; + + private string _queueName; + + [SetUp] + public virtual void Init() + { + _logger.Info("public virtual void Init(): called"); + + // Create a connection to the broker. + IConnectionInfo connectionInfo = QpidConnectionInfo.FromUrl(DEFAULT_URI); + _connection = new AMQConnection(connectionInfo); + _logger.Info("Starting..."); + + // Register this to listen for exceptions on the test connection. + _exceptionDelegate = new ExceptionListenerDelegate(OnException); + _connection.ExceptionListener += _exceptionDelegate; + + // Establish a session on the broker. + _channel = _connection.CreateChannel(false, AcknowledgeMode.AutoAcknowledge, 1); + + // Create a durable, non-temporary, non-exclusive queue. + _queueName = _channel.GenerateUniqueName(); + _channel.DeclareQueue(_queueName, true, false, false); + + _channel.Bind(_queueName, ExchangeNameDefaults.TOPIC, _routingKey); + + // Clear the most recent message and exception. + _lastException = null; + } + + [TearDown] + public virtual void ShutDown() + { + _logger.Info("public virtual void Shutdown(): called"); + + if (_connection != null) + { + _logger.Info("Disposing connection."); + _connection.Dispose(); + _logger.Info("Connection disposed."); + } + } + + [Test] + public void DeleteUsedQueue() + { + // Create the consumer + _consumer = _channel.CreateConsumerBuilder(_queueName) + .WithPrefetchLow(100) + .Create(); + _logger.Info("Consumer was created..."); + + // delete the queue + _channel.DeleteQueue(_queueName, false, true, true); + _logger.InfoFormat("Queue {0} was delete", _queueName); + + Assert.IsNull(_lastException); + } + + [Test] + public void DeleteUnusedQueue() + { + // delete the queue + _channel.DeleteQueue(_queueName, true, true, true); + _logger.InfoFormat("Queue {0} was delete", _queueName); + + Assert.IsNull(_lastException); + } + + [Test] + public void DeleteNonEmptyQueue() + { + // Create the publisher + _publisher = _channel.CreatePublisherBuilder() + .WithExchangeName(ExchangeNameDefaults.TOPIC) + .WithRoutingKey(_routingKey) + .Create(); + _logger.Info("Publisher created..."); + SendTestMessage("DeleteNonEmptyQueue Message 1"); + + try + { + _channel.DeleteQueue(_queueName, true, false, true); + } + catch (AMQException) + { + Assert.Fail("The test fails"); + } + } + + [Test] + public void DeleteEmptyQueue() + { + // Create the publisher + _publisher = _channel.CreatePublisherBuilder() + .WithExchangeName(ExchangeNameDefaults.TOPIC) + .WithRoutingKey(_routingKey) + .Create(); + _logger.Info("Publisher created..."); + + // delete an empty queue with ifEmpty = true + _channel.DeleteQueue(_queueName, false, true, true); + + Assert.IsNull(_lastException); + } + + [Test] + public void DeleteQueueWithResponse() + { + // Create the publisher + _publisher = _channel.CreatePublisherBuilder() + .WithExchangeName(ExchangeNameDefaults.TOPIC) + .WithRoutingKey(_routingKey) + .Create(); + _logger.Info("Publisher created..."); + + SendTestMessage("DeleteQueueWithResponse Message 1"); + SendTestMessage("DeleteQueueWithResponse Message 2"); + + // delete the queue, the server must respond + _channel.DeleteQueue(_queueName, false, false, false); + } + + [Test] + public void PurgeQueueWithResponse() + { + _publisher = _channel.CreatePublisherBuilder() + .WithExchangeName(ExchangeNameDefaults.TOPIC) + .WithRoutingKey(_routingKey) + .Create(); + _logger.Info("Pubisher created"); + + SendTestMessage("Message 1"); + SendTestMessage("Message 2"); + + _channel.PurgeQueue(_queueName, false); + } + + [Test] + public void PurgeQueueWithOutResponse() + { + _publisher = _channel.CreatePublisherBuilder() + .WithExchangeName(ExchangeNameDefaults.TOPIC) + .WithRoutingKey(_routingKey) + .Create(); + _logger.Info("Pubisher created"); + + SendTestMessage("Message 1"); + SendTestMessage("Message 2"); + + _channel.PurgeQueue(_queueName, true); + } + + + /// <summary> + /// Callback method to handle any exceptions raised by the test connection.</summary> /// + /// <param name="e">The connection exception.</param> + public void OnException(Exception e) + { + // Preserve the most recent exception in case test cases need to examine it. + _lastException = e; + + // Notify any waiting threads that an exception event has occurred. + _evt.Set(); + } + + /// <summary> + /// Sends the specified message to the test publisher, and confirms that it was received by the test consumer or not + /// depending on whether or not the message should be received by the consumer. + /// + /// Any exceptions raised by the connection will cause an Assert failure exception to be raised. + /// </summary> + /// + /// <param name="msgSend">The message to send.</param> + private void SendTestMessage(string msg) + { + // create the IMessage object + IMessage msgSend = _channel.CreateTextMessage(msg); + + // send the message + _publisher.Send(msgSend); + _logger.InfoFormat("The messages \"{0}\" was sent", msg); + } + + } +} diff --git a/dotnet/Qpid.Integration.Tests/testcases/CommitRollbackTest.cs b/dotnet/Qpid.Integration.Tests/testcases/CommitRollbackTest.cs index 72074da809..dbb3f70aec 100644 --- a/dotnet/Qpid.Integration.Tests/testcases/CommitRollbackTest.cs +++ b/dotnet/Qpid.Integration.Tests/testcases/CommitRollbackTest.cs @@ -1,261 +1,261 @@ -/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-using System;
-using System.Threading;
-using log4net;
-using NUnit.Framework;
-using Apache.Qpid.Messaging;
-using Apache.Qpid.Client.Qms;
-using Apache.Qpid.Client;
-
-namespace Apache.Qpid.Integration.Tests.testcases
-{
- /// <summary>
- /// CommitRollbackTest
- ///
- /// <p><table id="crc"><caption>CRC Card</caption>
- /// <tr><th> Responsibilities <th> Collaborations
- /// <tr><td> Check that an uncommitted send cannot be received.
- /// <tr><td> Check that a committed send can be received.
- /// <tr><td> Check that a rolled back send cannot be received.
- /// <tr><td> Check that an uncommitted receive can be re-received.
- /// <tr><td> Check that a committed receive cannot be re-received.
- /// <tr><td> Check that a rolled back receive can be re-received.
- /// </table>
- /// </summary>
- [TestFixture, Category("Integration")]
- public class CommitRollbackTest : BaseMessagingTestFixture
- {
- /// <summary>Used for debugging purposes.</summary>
- private static ILog log = LogManager.GetLogger(typeof(CommitRollbackTest));
-
- /// <summary>Defines the name of the test topic to use with the tests.</summary>
- public const string TEST_ROUTING_KEY = "commitrollbacktestkey";
-
- /// <summary>Used to count test messages received so far.</summary>
- private int messageReceivedCount;
-
- /// <summary>Used to hold the expected number of messages to receive.</summary>
- private int expectedMessageCount;
-
- /// <summary>Monitor used to signal succesfull receipt of all test messages.</summary>
- AutoResetEvent finishedEvent;
-
- /// <summary>Flag used to indicate that all messages really were received, and that the test did not just time out. </summary>
- private bool allReceived;
-
- [SetUp]
- public override void Init()
- {
- base.Init();
-
- // Create one producer and one consumer, p2p, tx, consumer with queue bound to producers routing key.
- SetUpEndPoint(0, true, false, TEST_ROUTING_KEY + testId, AcknowledgeMode.AutoAcknowledge, true, ExchangeNameDefaults.DIRECT,
- true, false, null);
- SetUpEndPoint(1, true, true, TEST_ROUTING_KEY + testId, AcknowledgeMode.AutoAcknowledge, true, ExchangeNameDefaults.DIRECT,
- true, false, null);
-
- // Clear counts
- messageReceivedCount = 0;
- expectedMessageCount = 0;
- finishedEvent = new AutoResetEvent(false);
- allReceived = false;
- }
-
- [TearDown]
- public override void Shutdown()
- {
- try
- {
- // Clean up after the test.
- CloseEndPoint(0);
- CloseEndPoint(1);
- }
- finally
- {
- base.Shutdown();
- }
- }
-
- /// <summary> Check that an uncommitted send cannot be received. </summary>
- [Test]
- public void TestUncommittedSendNotReceived()
- {
- // Send messages.
- testProducer[0].Send(testChannel[0].CreateTextMessage("A"));
-
- // Try to receive messages.
- ConsumeNMessagesOnly(0, "A", testConsumer[1]);
- testChannel[1].Commit();
- }
-
- /// <summary> Check that a committed send can be received. </summary>
- [Test]
- public void TestCommittedSendReceived()
- {
- // Send messages.
- testProducer[0].Send(testChannel[0].CreateTextMessage("B"));
- testChannel[0].Commit();
-
- // Try to receive messages.
- ConsumeNMessagesOnly(1, "B", testConsumer[1]);
- testChannel[1].Commit();
- }
-
- /// <summary> Check that a rolled back send cannot be received. </summary>
- [Test]
- public void TestRolledBackSendNotReceived()
- {
- // Send messages.
- testProducer[0].Send(testChannel[0].CreateTextMessage("B"));
- testChannel[0].Rollback();
-
- // Try to receive messages.
- ConsumeNMessagesOnly(0, "B", testConsumer[1]);
- testChannel[1].Commit();
- }
-
- /// <summary> Check that an uncommitted receive can be re-received. </summary>
- [Test]
- public void TestUncommittedReceiveCanBeRereceived()
- {
- // Create a third end-point as an alternative delivery route for the message.
- SetUpEndPoint(2, false, true, TEST_ROUTING_KEY + testId, AcknowledgeMode.AutoAcknowledge, true, ExchangeNameDefaults.DIRECT,
- true, false, null);
-
- // Send messages.
- testProducer[0].Send(testChannel[0].CreateTextMessage("C"));
- testChannel[0].Commit();
-
- // Try to receive messages.
- ConsumeNMessagesOnly(1, "C", testConsumer[1]);
-
- // Close end-point 1 without committing the message, then re-open to consume again.
- CloseEndPoint(1);
-
- // Check that the message was released from the rolled back end-point an can be received on the alternative one instead.
- ConsumeNMessagesOnly(1, "C", testConsumer[2]);
-
- CloseEndPoint(2);
- }
-
- /// <summary> Check that a committed receive cannot be re-received. </summary>
- [Test]
- public void TestCommittedReceiveNotRereceived()
- {
- // Send messages.
- testProducer[0].Send(testChannel[0].CreateTextMessage("D"));
- testChannel[0].Commit();
-
- // Try to receive messages.
- ConsumeNMessagesOnly(1, "D", testConsumer[1]);
- testChannel[1].Commit();
-
- // Try to receive messages.
- ConsumeNMessagesOnly(0, "D", testConsumer[1]);
- }
-
- /// <summary> Check that a rolled back receive can be re-received. </summary>
- [Test]
- public void TestRolledBackReceiveCanBeRereceived()
- {
- // Send messages.
- testProducer[0].Send(testChannel[0].CreateTextMessage("E"));
- testChannel[0].Commit();
-
- // Try to receive messages.
- ConsumeNMessagesOnly(1, "E", testConsumer[1]);
-
- testChannel[1].Rollback();
-
- // Try to receive messages.
- ConsumeNMessagesOnly(1, "E", testConsumer[1]);
-
- }
-
- [Test]
- public void TestReceiveAndSendRollback()
- {
- // Send messages
- testProducer[0].Send(testChannel[0].CreateTextMessage("F"));
- testChannel[0].Commit();
-
- // Try to receive messages.
- ConsumeNMessagesOnly(1, "F", testConsumer[1]);
- testProducer[1].Send(testChannel[1].CreateTextMessage("G"));
- testChannel[1].Rollback();
-
- // Try to receive messages.
- ConsumeNMessagesOnly(1, "F", testConsumer[1]);
-
- }
-
- [Test]
- public void TestReceivePrePublished()
- {
- // Send messages
- for (int i = 0; i < 10; ++i)
- {
- testProducer[0].Send(testChannel[0].CreateTextMessage("G"+i));
- testChannel[0].Commit();
- }
-
- for (int i = 0; i < 10; ++i)
- {
- ConsumeNMessages(1, "G"+i, testConsumer[1]);
- }
- testChannel[1].Commit();
- }
-
- [Test]
- public void TestReceivePrePublishedOnMessageHandler()
- {
- testConsumer[1].OnMessage += new MessageReceivedDelegate(OnMessage);
- // Send messages
- for (int i = 0; i < 10; ++i)
- {
- testProducer[0].Send(testChannel[0].CreateTextMessage("G"+i));
- testChannel[0].Commit();
- }
- expectedMessageCount = 10;
-
- finishedEvent.WaitOne(new TimeSpan(0, 0, 0, 30), false);
-
- // Check that all messages really were received.
- Assert.IsTrue(allReceived, "All messages were not received, only got: " + messageReceivedCount + " but wanted " + expectedMessageCount);
-
- testChannel[1].Commit();
- }
-
- /// <summary> Atomically increments the message count on every message, and signals once all messages in the test are received. </summary>
- public void OnMessage(IMessage m)
- {
- int newCount = Interlocked.Increment(ref messageReceivedCount);
-
- if (newCount >= expectedMessageCount)
- {
- allReceived = true;
- finishedEvent.Set();
- }
- }
-
- }
-}
+/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using System.Threading; +using log4net; +using NUnit.Framework; +using Apache.Qpid.Messaging; +using Apache.Qpid.Client.Qms; +using Apache.Qpid.Client; + +namespace Apache.Qpid.Integration.Tests.testcases +{ + /// <summary> + /// CommitRollbackTest + /// + /// <p><table id="crc"><caption>CRC Card</caption> + /// <tr><th> Responsibilities <th> Collaborations + /// <tr><td> Check that an uncommitted send cannot be received. + /// <tr><td> Check that a committed send can be received. + /// <tr><td> Check that a rolled back send cannot be received. + /// <tr><td> Check that an uncommitted receive can be re-received. + /// <tr><td> Check that a committed receive cannot be re-received. + /// <tr><td> Check that a rolled back receive can be re-received. + /// </table> + /// </summary> + [TestFixture, Category("Integration")] + public class CommitRollbackTest : BaseMessagingTestFixture + { + /// <summary>Used for debugging purposes.</summary> + private static ILog log = LogManager.GetLogger(typeof(CommitRollbackTest)); + + /// <summary>Defines the name of the test topic to use with the tests.</summary> + public const string TEST_ROUTING_KEY = "commitrollbacktestkey"; + + /// <summary>Used to count test messages received so far.</summary> + private int messageReceivedCount; + + /// <summary>Used to hold the expected number of messages to receive.</summary> + private int expectedMessageCount; + + /// <summary>Monitor used to signal succesfull receipt of all test messages.</summary> + AutoResetEvent finishedEvent; + + /// <summary>Flag used to indicate that all messages really were received, and that the test did not just time out. </summary> + private bool allReceived; + + [SetUp] + public override void Init() + { + base.Init(); + + // Create one producer and one consumer, p2p, tx, consumer with queue bound to producers routing key. + SetUpEndPoint(0, true, false, TEST_ROUTING_KEY + testId, AcknowledgeMode.AutoAcknowledge, true, ExchangeNameDefaults.DIRECT, + true, false, null); + SetUpEndPoint(1, true, true, TEST_ROUTING_KEY + testId, AcknowledgeMode.AutoAcknowledge, true, ExchangeNameDefaults.DIRECT, + true, false, null); + + // Clear counts + messageReceivedCount = 0; + expectedMessageCount = 0; + finishedEvent = new AutoResetEvent(false); + allReceived = false; + } + + [TearDown] + public override void Shutdown() + { + try + { + // Clean up after the test. + CloseEndPoint(0); + CloseEndPoint(1); + } + finally + { + base.Shutdown(); + } + } + + /// <summary> Check that an uncommitted send cannot be received. </summary> + [Test] + public void TestUncommittedSendNotReceived() + { + // Send messages. + testProducer[0].Send(testChannel[0].CreateTextMessage("A")); + + // Try to receive messages. + ConsumeNMessagesOnly(0, "A", testConsumer[1]); + testChannel[1].Commit(); + } + + /// <summary> Check that a committed send can be received. </summary> + [Test] + public void TestCommittedSendReceived() + { + // Send messages. + testProducer[0].Send(testChannel[0].CreateTextMessage("B")); + testChannel[0].Commit(); + + // Try to receive messages. + ConsumeNMessagesOnly(1, "B", testConsumer[1]); + testChannel[1].Commit(); + } + + /// <summary> Check that a rolled back send cannot be received. </summary> + [Test] + public void TestRolledBackSendNotReceived() + { + // Send messages. + testProducer[0].Send(testChannel[0].CreateTextMessage("B")); + testChannel[0].Rollback(); + + // Try to receive messages. + ConsumeNMessagesOnly(0, "B", testConsumer[1]); + testChannel[1].Commit(); + } + + /// <summary> Check that an uncommitted receive can be re-received. </summary> + [Test] + public void TestUncommittedReceiveCanBeRereceived() + { + // Create a third end-point as an alternative delivery route for the message. + SetUpEndPoint(2, false, true, TEST_ROUTING_KEY + testId, AcknowledgeMode.AutoAcknowledge, true, ExchangeNameDefaults.DIRECT, + true, false, null); + + // Send messages. + testProducer[0].Send(testChannel[0].CreateTextMessage("C")); + testChannel[0].Commit(); + + // Try to receive messages. + ConsumeNMessagesOnly(1, "C", testConsumer[1]); + + // Close end-point 1 without committing the message, then re-open to consume again. + CloseEndPoint(1); + + // Check that the message was released from the rolled back end-point an can be received on the alternative one instead. + ConsumeNMessagesOnly(1, "C", testConsumer[2]); + + CloseEndPoint(2); + } + + /// <summary> Check that a committed receive cannot be re-received. </summary> + [Test] + public void TestCommittedReceiveNotRereceived() + { + // Send messages. + testProducer[0].Send(testChannel[0].CreateTextMessage("D")); + testChannel[0].Commit(); + + // Try to receive messages. + ConsumeNMessagesOnly(1, "D", testConsumer[1]); + testChannel[1].Commit(); + + // Try to receive messages. + ConsumeNMessagesOnly(0, "D", testConsumer[1]); + } + + /// <summary> Check that a rolled back receive can be re-received. </summary> + [Test] + public void TestRolledBackReceiveCanBeRereceived() + { + // Send messages. + testProducer[0].Send(testChannel[0].CreateTextMessage("E")); + testChannel[0].Commit(); + + // Try to receive messages. + ConsumeNMessagesOnly(1, "E", testConsumer[1]); + + testChannel[1].Rollback(); + + // Try to receive messages. + ConsumeNMessagesOnly(1, "E", testConsumer[1]); + + } + + [Test] + public void TestReceiveAndSendRollback() + { + // Send messages + testProducer[0].Send(testChannel[0].CreateTextMessage("F")); + testChannel[0].Commit(); + + // Try to receive messages. + ConsumeNMessagesOnly(1, "F", testConsumer[1]); + testProducer[1].Send(testChannel[1].CreateTextMessage("G")); + testChannel[1].Rollback(); + + // Try to receive messages. + ConsumeNMessagesOnly(1, "F", testConsumer[1]); + + } + + [Test] + public void TestReceivePrePublished() + { + // Send messages + for (int i = 0; i < 10; ++i) + { + testProducer[0].Send(testChannel[0].CreateTextMessage("G"+i)); + testChannel[0].Commit(); + } + + for (int i = 0; i < 10; ++i) + { + ConsumeNMessages(1, "G"+i, testConsumer[1]); + } + testChannel[1].Commit(); + } + + [Test] + public void TestReceivePrePublishedOnMessageHandler() + { + testConsumer[1].OnMessage += new MessageReceivedDelegate(OnMessage); + // Send messages + for (int i = 0; i < 10; ++i) + { + testProducer[0].Send(testChannel[0].CreateTextMessage("G"+i)); + testChannel[0].Commit(); + } + expectedMessageCount = 10; + + finishedEvent.WaitOne(new TimeSpan(0, 0, 0, 30), false); + + // Check that all messages really were received. + Assert.IsTrue(allReceived, "All messages were not received, only got: " + messageReceivedCount + " but wanted " + expectedMessageCount); + + testChannel[1].Commit(); + } + + /// <summary> Atomically increments the message count on every message, and signals once all messages in the test are received. </summary> + public void OnMessage(IMessage m) + { + int newCount = Interlocked.Increment(ref messageReceivedCount); + + if (newCount >= expectedMessageCount) + { + allReceived = true; + finishedEvent.Set(); + } + } + + } +} diff --git a/dotnet/Qpid.Integration.Tests/testcases/ConnectionTest.cs b/dotnet/Qpid.Integration.Tests/testcases/ConnectionTest.cs index 357f164346..d7b4a4ddd2 100644 --- a/dotnet/Qpid.Integration.Tests/testcases/ConnectionTest.cs +++ b/dotnet/Qpid.Integration.Tests/testcases/ConnectionTest.cs @@ -1,73 +1,73 @@ -/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-using System;
-using NUnit.Framework;
-using Apache.Qpid.Client.Qms;
-using Apache.Qpid.Client;
-using Apache.Qpid.Messaging;
-
-namespace Apache.Qpid.Integration.Tests.testcases
-{
- [TestFixture, Category("Integration")]
- public class ConnectionTest
- {
- private AmqBrokerInfo _broker =
- new AmqBrokerInfo("amqp", "localhost", 5672, false);
-
- [Test]
- public void SimpleConnection()
- {
- IConnectionInfo connectionInfo = new QpidConnectionInfo();
- connectionInfo.VirtualHost = "test";
- connectionInfo.AddBrokerInfo(_broker);
- using (IConnection connection = new AMQConnection(connectionInfo))
- {
- Console.WriteLine("connection = " + connection);
- }
- }
-
- [Test]
- [ExpectedException(typeof(AMQAuthenticationException))]
- public void PasswordFailureConnection()
- {
- IConnectionInfo connectionInfo = new QpidConnectionInfo();
- connectionInfo.VirtualHost = "test";
- connectionInfo.Password = "rubbish";
- connectionInfo.AddBrokerInfo(_broker);
-
- using (IConnection connection = new AMQConnection(connectionInfo))
- {
- Console.WriteLine("connection = " + connection);
- // wrong
- Assert.Fail("Authentication succeeded but should've failed");
- }
- }
-
- [Test]
- [ExpectedException(typeof(AMQConnectionException))]
- public void ConnectionFailure()
- {
- string url = "amqp://guest:guest@clientid/testpath?brokerlist='tcp://localhost:5673?retries='0''";
- new AMQConnection(QpidConnectionInfo.FromUrl(url));
- Assert.Fail("Connection should not be established");
- }
- }
-}
+/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using NUnit.Framework; +using Apache.Qpid.Client.Qms; +using Apache.Qpid.Client; +using Apache.Qpid.Messaging; + +namespace Apache.Qpid.Integration.Tests.testcases +{ + [TestFixture, Category("Integration")] + public class ConnectionTest + { + private AmqBrokerInfo _broker = + new AmqBrokerInfo("amqp", "localhost", 5672, false); + + [Test] + public void SimpleConnection() + { + IConnectionInfo connectionInfo = new QpidConnectionInfo(); + connectionInfo.VirtualHost = "test"; + connectionInfo.AddBrokerInfo(_broker); + using (IConnection connection = new AMQConnection(connectionInfo)) + { + Console.WriteLine("connection = " + connection); + } + } + + [Test] + [ExpectedException(typeof(AMQAuthenticationException))] + public void PasswordFailureConnection() + { + IConnectionInfo connectionInfo = new QpidConnectionInfo(); + connectionInfo.VirtualHost = "test"; + connectionInfo.Password = "rubbish"; + connectionInfo.AddBrokerInfo(_broker); + + using (IConnection connection = new AMQConnection(connectionInfo)) + { + Console.WriteLine("connection = " + connection); + // wrong + Assert.Fail("Authentication succeeded but should've failed"); + } + } + + [Test] + [ExpectedException(typeof(AMQConnectionException))] + public void ConnectionFailure() + { + string url = "amqp://guest:guest@clientid/testpath?brokerlist='tcp://localhost:5673?retries='0''"; + new AMQConnection(QpidConnectionInfo.FromUrl(url)); + Assert.Fail("Connection should not be established"); + } + } +} diff --git a/dotnet/Qpid.Integration.Tests/testcases/DurableSubscriptionTest.cs b/dotnet/Qpid.Integration.Tests/testcases/DurableSubscriptionTest.cs index ac975100b1..b7973ae3f5 100644 --- a/dotnet/Qpid.Integration.Tests/testcases/DurableSubscriptionTest.cs +++ b/dotnet/Qpid.Integration.Tests/testcases/DurableSubscriptionTest.cs @@ -1,166 +1,166 @@ -/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-using System;
-using System.Threading;
-using log4net;
-using NUnit.Framework;
-using Apache.Qpid.Messaging;
-using Apache.Qpid.Client.Qms;
-
-namespace Apache.Qpid.Integration.Tests.testcases
-{
- /// <summary>
- /// DurableSubscriptionTest checks that durable subscriptions work, by sending messages that can be picked up by
- /// a subscription that is currently off-line, and checking that the subscriber gets all of its messages when it
- /// does come on-line.
- ///
- /// <p/><table id="crc"><caption>CRC Card</caption>
- /// <tr><th> Responsibilities <th> Collaborations
- /// <tr><td>
- /// </table>
- /// </summary>
- [TestFixture, Category("Integration")]
- public class DurableSubscriptionTest : BaseMessagingTestFixture
- {
- /// <summary>Used for debugging purposes.</summary>
- private static ILog log = LogManager.GetLogger(typeof(DurableSubscriptionTest));
-
- /// <summary>Defines the name of the test topic to use with the tests.</summary>
- public const string TEST_ROUTING_KEY = "durablesubtestkey";
-
- [SetUp]
- public override void Init()
- {
- base.Init();
- }
-
- [TearDown]
- public override void Shutdown()
- {
- base.Shutdown();
- }
-
- [Test]
- public void TestDurableSubscriptionNoAck()
- {
- TestDurableSubscription(AcknowledgeMode.NoAcknowledge);
- }
-
- [Test]
- public void TestDurableSubscriptionAutoAck()
- {
- TestDurableSubscription(AcknowledgeMode.AutoAcknowledge);
- }
-
- private void TestDurableSubscription(AcknowledgeMode ackMode)
- {
- // Create a topic with one producer and two consumers.
- SetUpEndPoint(0, true, false, TEST_ROUTING_KEY + testId, ackMode, false, ExchangeNameDefaults.TOPIC, true, false, null);
- SetUpEndPoint(1, false, true, TEST_ROUTING_KEY + testId, ackMode, false, ExchangeNameDefaults.TOPIC, true, false, null);
- SetUpEndPoint(2, false, true, TEST_ROUTING_KEY + testId, ackMode, false, ExchangeNameDefaults.TOPIC, true,
- true, "TestSubscription" + testId);
-
- Thread.Sleep(500);
-
- // Send messages and receive on both consumers.
- testProducer[0].Send(testChannel[0].CreateTextMessage("A"));
-
- ConsumeNMessagesOnly(1, "A", testConsumer[1]);
- ConsumeNMessagesOnly(1, "A", testConsumer[2]);
-
- // Detach one consumer.
- CloseEndPoint(2);
-
- // Send message and receive on one consumer.
- testProducer[0].Send(testChannel[0].CreateTextMessage("B"));
-
- ConsumeNMessagesOnly(1, "B", testConsumer[1]);
-
- // Re-attach consumer, check that it gets the messages that it missed.
- SetUpEndPoint(2, false, true, TEST_ROUTING_KEY + testId, ackMode, false, ExchangeNameDefaults.TOPIC, true,
- true, "TestSubscription" + testId);
-
- ConsumeNMessagesOnly(1, "B", testConsumer[2]);
-
- // Clean up any open consumers at the end of the test.
- CloseEndPoint(2);
- CloseEndPoint(1);
- CloseEndPoint(0);
- }
-
- /// <summary> Check that an uncommitted receive can be re-received, on re-consume from the same durable subscription. </summary>
- [Test]
- public void TestUncommittedReceiveCanBeRereceivedNewConnection()
- {
- SetUpEndPoint(0, true, false, TEST_ROUTING_KEY + testId, AcknowledgeMode.AutoAcknowledge, true, ExchangeNameDefaults.TOPIC,
- true, false, null);
- SetUpEndPoint(1, false, true, TEST_ROUTING_KEY + testId, AcknowledgeMode.AutoAcknowledge, true, ExchangeNameDefaults.TOPIC,
- true, true, "foo"+testId);
-
- // Send messages.
- testProducer[0].Send(testChannel[0].CreateTextMessage("C"));
- testChannel[0].Commit();
-
- // Try to receive messages, but don't commit them.
- ConsumeNMessagesOnly(1, "C", testConsumer[1]);
-
- // Close end-point 1 without committing the message, then re-open the subscription to consume again.
- CloseEndPoint(1);
- SetUpEndPoint(1, false, true, TEST_ROUTING_KEY + testId, AcknowledgeMode.AutoAcknowledge, true, ExchangeNameDefaults.TOPIC,
- true, true, "foo"+testId);
-
- // Check that the message was released from the rolled back end-point an can be received on the alternative one instead.
- ConsumeNMessagesOnly(1, "C", testConsumer[1]);
- testChannel[1].Commit();
- CloseEndPoint(1);
- CloseEndPoint(0);
- }
-
- /// <summary> Check that a rolled back receive can be re-received, on re-consume from the same durable subscription. </summary>
- [Test]
- public void TestRolledBackReceiveCanBeRereceivedNewConnection()
- {
- SetUpEndPoint(0, true, false, TEST_ROUTING_KEY + testId, AcknowledgeMode.AutoAcknowledge, true, ExchangeNameDefaults.TOPIC,
- true, false, null);
- SetUpEndPoint(1, false, true, TEST_ROUTING_KEY + testId, AcknowledgeMode.AutoAcknowledge, true, ExchangeNameDefaults.TOPIC,
- true, true, "foo"+testId);
-
- // Send messages.
- testProducer[0].Send(testChannel[0].CreateTextMessage("D"));
- testChannel[0].Commit();
-
- // Try to receive messages, but roll them back.
- ConsumeNMessagesOnly(1, "D", testConsumer[1]);
- testChannel[1].Rollback();
-
- // Close end-point 1 without committing the message, then re-open the subscription to consume again.
- CloseEndPoint(1);
- SetUpEndPoint(1, false, true, TEST_ROUTING_KEY + testId, AcknowledgeMode.AutoAcknowledge, true, ExchangeNameDefaults.TOPIC,
- true, true, "foo"+testId);
-
- // Check that the message was released from the rolled back end-point an can be received on the alternative one instead.
- ConsumeNMessagesOnly(1, "D", testConsumer[1]);
- testChannel[1].Commit();
- CloseEndPoint(1);
- CloseEndPoint(0);
- }
- }
-}
+/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using System.Threading; +using log4net; +using NUnit.Framework; +using Apache.Qpid.Messaging; +using Apache.Qpid.Client.Qms; + +namespace Apache.Qpid.Integration.Tests.testcases +{ + /// <summary> + /// DurableSubscriptionTest checks that durable subscriptions work, by sending messages that can be picked up by + /// a subscription that is currently off-line, and checking that the subscriber gets all of its messages when it + /// does come on-line. + /// + /// <p/><table id="crc"><caption>CRC Card</caption> + /// <tr><th> Responsibilities <th> Collaborations + /// <tr><td> + /// </table> + /// </summary> + [TestFixture, Category("Integration")] + public class DurableSubscriptionTest : BaseMessagingTestFixture + { + /// <summary>Used for debugging purposes.</summary> + private static ILog log = LogManager.GetLogger(typeof(DurableSubscriptionTest)); + + /// <summary>Defines the name of the test topic to use with the tests.</summary> + public const string TEST_ROUTING_KEY = "durablesubtestkey"; + + [SetUp] + public override void Init() + { + base.Init(); + } + + [TearDown] + public override void Shutdown() + { + base.Shutdown(); + } + + [Test] + public void TestDurableSubscriptionNoAck() + { + TestDurableSubscription(AcknowledgeMode.NoAcknowledge); + } + + [Test] + public void TestDurableSubscriptionAutoAck() + { + TestDurableSubscription(AcknowledgeMode.AutoAcknowledge); + } + + private void TestDurableSubscription(AcknowledgeMode ackMode) + { + // Create a topic with one producer and two consumers. + SetUpEndPoint(0, true, false, TEST_ROUTING_KEY + testId, ackMode, false, ExchangeNameDefaults.TOPIC, true, false, null); + SetUpEndPoint(1, false, true, TEST_ROUTING_KEY + testId, ackMode, false, ExchangeNameDefaults.TOPIC, true, false, null); + SetUpEndPoint(2, false, true, TEST_ROUTING_KEY + testId, ackMode, false, ExchangeNameDefaults.TOPIC, true, + true, "TestSubscription" + testId); + + Thread.Sleep(500); + + // Send messages and receive on both consumers. + testProducer[0].Send(testChannel[0].CreateTextMessage("A")); + + ConsumeNMessagesOnly(1, "A", testConsumer[1]); + ConsumeNMessagesOnly(1, "A", testConsumer[2]); + + // Detach one consumer. + CloseEndPoint(2); + + // Send message and receive on one consumer. + testProducer[0].Send(testChannel[0].CreateTextMessage("B")); + + ConsumeNMessagesOnly(1, "B", testConsumer[1]); + + // Re-attach consumer, check that it gets the messages that it missed. + SetUpEndPoint(2, false, true, TEST_ROUTING_KEY + testId, ackMode, false, ExchangeNameDefaults.TOPIC, true, + true, "TestSubscription" + testId); + + ConsumeNMessagesOnly(1, "B", testConsumer[2]); + + // Clean up any open consumers at the end of the test. + CloseEndPoint(2); + CloseEndPoint(1); + CloseEndPoint(0); + } + + /// <summary> Check that an uncommitted receive can be re-received, on re-consume from the same durable subscription. </summary> + [Test] + public void TestUncommittedReceiveCanBeRereceivedNewConnection() + { + SetUpEndPoint(0, true, false, TEST_ROUTING_KEY + testId, AcknowledgeMode.AutoAcknowledge, true, ExchangeNameDefaults.TOPIC, + true, false, null); + SetUpEndPoint(1, false, true, TEST_ROUTING_KEY + testId, AcknowledgeMode.AutoAcknowledge, true, ExchangeNameDefaults.TOPIC, + true, true, "foo"+testId); + + // Send messages. + testProducer[0].Send(testChannel[0].CreateTextMessage("C")); + testChannel[0].Commit(); + + // Try to receive messages, but don't commit them. + ConsumeNMessagesOnly(1, "C", testConsumer[1]); + + // Close end-point 1 without committing the message, then re-open the subscription to consume again. + CloseEndPoint(1); + SetUpEndPoint(1, false, true, TEST_ROUTING_KEY + testId, AcknowledgeMode.AutoAcknowledge, true, ExchangeNameDefaults.TOPIC, + true, true, "foo"+testId); + + // Check that the message was released from the rolled back end-point an can be received on the alternative one instead. + ConsumeNMessagesOnly(1, "C", testConsumer[1]); + testChannel[1].Commit(); + CloseEndPoint(1); + CloseEndPoint(0); + } + + /// <summary> Check that a rolled back receive can be re-received, on re-consume from the same durable subscription. </summary> + [Test] + public void TestRolledBackReceiveCanBeRereceivedNewConnection() + { + SetUpEndPoint(0, true, false, TEST_ROUTING_KEY + testId, AcknowledgeMode.AutoAcknowledge, true, ExchangeNameDefaults.TOPIC, + true, false, null); + SetUpEndPoint(1, false, true, TEST_ROUTING_KEY + testId, AcknowledgeMode.AutoAcknowledge, true, ExchangeNameDefaults.TOPIC, + true, true, "foo"+testId); + + // Send messages. + testProducer[0].Send(testChannel[0].CreateTextMessage("D")); + testChannel[0].Commit(); + + // Try to receive messages, but roll them back. + ConsumeNMessagesOnly(1, "D", testConsumer[1]); + testChannel[1].Rollback(); + + // Close end-point 1 without committing the message, then re-open the subscription to consume again. + CloseEndPoint(1); + SetUpEndPoint(1, false, true, TEST_ROUTING_KEY + testId, AcknowledgeMode.AutoAcknowledge, true, ExchangeNameDefaults.TOPIC, + true, true, "foo"+testId); + + // Check that the message was released from the rolled back end-point an can be received on the alternative one instead. + ConsumeNMessagesOnly(1, "D", testConsumer[1]); + testChannel[1].Commit(); + CloseEndPoint(1); + CloseEndPoint(0); + } + } +} diff --git a/dotnet/Qpid.Integration.Tests/testcases/HeadersExchangeTest.cs b/dotnet/Qpid.Integration.Tests/testcases/HeadersExchangeTest.cs index 5e17cf1d2d..2094aa3b1b 100644 --- a/dotnet/Qpid.Integration.Tests/testcases/HeadersExchangeTest.cs +++ b/dotnet/Qpid.Integration.Tests/testcases/HeadersExchangeTest.cs @@ -1,282 +1,282 @@ -/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-using System;
-using System.Threading;
-using log4net;
-using NUnit.Framework;
-using Apache.Qpid.Framing;
-using Apache.Qpid.Messaging;
-using Apache.Qpid.Client.Qms;
-using Apache.Qpid.Client;
-
-namespace Apache.Qpid.Integration.Tests.testcases
-{
- /// <summary>
- /// Sets up a producer/consumer pair to send test messages through a header exchange. The header exchange matching pattern is tested to
- /// verify that it correctly matches or filters out messages based on their headers.
- ///
- /// Check that a message matching all fields of a headers exchange is passed by the exchange.
- /// Check that a message containing values for empty fields of a headers exchange is passed by the exchange.
- /// Check that a message matching only some fields of a headers exhcnage is not passed by the exchange.
- /// Check that a message with additional fields to the correct matching fields of a headers exchange is passed by the exchange.
- /// </summary>
- ///
- /// <todo>Remove the HeadersMatchingProducer class and rename this to HeaderExchangeTest. The producer and consumer are implemented
- /// in a single test class to make running this as part of an automated test suite possible.</todo>
- ///
- /// <todo>Consider not using a delegate to callback the OnMessage method. Easier to just call receive on the consumer but using the
- /// callback does demonstrate how to do so.</todo>
- [TestFixture, Category("Integration")]
- public class HeadersExchangeTest : BaseMessagingTestFixture
- {
- private static ILog _logger = LogManager.GetLogger(typeof(HeadersExchangeTest));
-
- /// <summary> Holds the default test timeout for broker communications before tests give up. </summary>
- private static readonly int TIMEOUT = 2000;
-
- /// <summary> Holds the name of the headers exchange to create to send test messages on. </summary>
- private string _exchangeName = "ServiceQ1";
-
- /// <summary> Used to preserve the most recent exception in case test cases need to examine it. </summary>
- private Exception _lastException = null;
-
- /// <summary> Used to preserve the most recent message from the test consumer. </summary>
- private IMessage _lastMessage = null;
-
- /// <summary> The test consumer to get messages from the broker with. </summary>
- private IMessageConsumer _consumer;
-
- private IMessagePublisher _publisher;
-
- private AutoResetEvent _evt = new AutoResetEvent(false);
-
- private MessageReceivedDelegate _msgRecDelegate;
- private ExceptionListenerDelegate _exceptionDelegate;
-
- /// <summary> Holds the test connection. </summary>
- protected IConnection _connection;
-
- /// <summary> Holds the test channel. </summary>
- protected IChannel _channel;
-
- [SetUp]
- public override void Init()
- {
- // Ensure that the base init method is called. It establishes a connection with the broker.
- base.Init();
-
- connectionInfo = QpidConnectionInfo.FromUrl(connectionUri);
- _connection = new AMQConnection(connectionInfo);
- _channel = _connection.CreateChannel(false, AcknowledgeMode.AutoAcknowledge, 500, 300);
-
- _logger.Info("Starting...");
- _logger.Info("Exchange name is '" + _exchangeName + "'...");
-
- // Register this to listen for exceptions on the test connection.
- _exceptionDelegate = new ExceptionListenerDelegate(OnException);
- _connection.ExceptionListener += _exceptionDelegate;
-
- // Declare a new headers exchange with the name of the test service.
- _channel.DeclareExchange(_exchangeName, ExchangeClassConstants.HEADERS);
-
- // Create a non-durable, temporary (aka auto-delete), exclusive queue.
- string queueName = _channel.GenerateUniqueName();
- _channel.DeclareQueue(queueName, false, true, true);
-
- // Bind the queue to the new headers exchange, setting up some header patterns for the exchange to match.
- _channel.Bind(queueName, _exchangeName, null, CreatePatternAsFieldTable());
-
- // Create a test consumer to consume messages from the test exchange.
- _consumer = _channel.CreateConsumerBuilder(queueName)
- .WithPrefetchLow(100)
- .WithPrefetchHigh(500)
- .WithNoLocal(false) // make sure we get our own messages
- .Create();
-
- // Register this to listen for messages on the consumer.
- _msgRecDelegate = new MessageReceivedDelegate(OnMessage);
- _consumer.OnMessage += _msgRecDelegate;
-
- // Clear the most recent message and exception.
- _lastException = null;
- _lastMessage = null;
-
- _publisher = _channel.CreatePublisherBuilder()
- .WithExchangeName(_exchangeName)
- .WithMandatory(true)
- .Create();
-
- _publisher.DeliveryMode = DeliveryMode.NonPersistent;
-
- // Start all channel
- _connection.Start();
- }
-
- /// <summary>
- /// Deregisters the on message delegate before closing the connection.
- /// </summary>
- [TearDown]
- public override void Shutdown()
- {
- _logger.Info("public void Shutdown(): called");
-
- //_consumer.OnMessage -= _msgRecDelegate;
- //_connection.ExceptionListener -= _exceptionDelegate;
-
- _connection.Stop();
- _connection.Close();
- _connection.Dispose();
-
- base.Shutdown();
- }
-
- /// <summary>
- /// Callback method that is passed any messages received on the test channel.
- /// </summary>
- ///
- /// <param name="message">The received message.</param>
- public void OnMessage(IMessage message)
- {
- _logger.Debug(string.Format("message.Type = {0}", message.GetType()));
- _logger.Debug("Got message '" + message + "'");
-
- // Preserve the most recent exception so that test cases can examine it.
- _lastMessage = message;
-
- // Notify any waiting threads that a message has been received.
- _evt.Set();
- }
-
- /// <summary>Callback method to handle any exceptions raised by the test connection.</summary>
- ///
- /// <param name="e">The connection exception.</param>
- public void OnException(Exception e)
- {
- // Preserve the most recent exception in case test cases need to examine it.
- _lastException = e;
-
- // Notify any waiting threads that an exception event has occurred.
- _evt.Set();
- }
-
- /// <summary>Check that a message matching all fields of a headers exchange is passed by the exchange.</summary>
- [Test]
- public void TestMatchAll()
- {
- IMessage msg = _channel.CreateTextMessage("matches match2=''");
- msg.Headers["match1"] = "foo";
- msg.Headers["match2"] = "";
-
- // Use the SendTestMessage helper method to verify that the message was sent and received.
- SendTestMessage(msg, true);
- }
-
- /// <summary>Check that a message containing values for empty fields of a headers exchange is passed by the exchange.</summary>
- [Test]
- public void TestMatchEmptyMatchesAnything()
- {
- // Send a test message that matches the headers exchange.
- IMessage msg = _channel.CreateTextMessage("matches match1='foo' and match2='bar'");
- msg.Headers["match1"] = "foo";
- msg.Headers["match2"] = "bar";
-
- // Use the SendTestMessage helper method to verify that the message was sent and received.
- SendTestMessage(msg, true);
- }
-
- /// <summary>Check that a message matching only some fields of a headers exchange is not passed by the exchange.</summary>
- [Test]
- public void TestMatchOneFails()
- {
- IMessage msg = _channel.CreateTextMessage("not match - only match1");
- msg.Headers["match1"] = "foo";
-
- // Use the SendTestMessage helper method to verify that the message was sent and not received.
- SendTestMessage(msg, false);
- }
-
- /// <summary>
- /// Check that a message with additional fields to the correct matching fields of a headers exchange is passed by
- /// the exchange.
- /// </summary>
- [Test]
- public void TestMatchExtraFields()
- {
- IMessage msg = _channel.CreateTextMessage("matches - extra headers");
- msg.Headers["match1"] = "foo";
- msg.Headers["match2"] = "bar";
- msg.Headers["match3"] = "not required";
-
- // Use the SendTestMessage helper method to verify that the message was sent and received.
- SendTestMessage(msg, true);
- }
-
- /// <summary>
- /// Sends the specified message to the test publisher, and confirms that it was received by the test consumer or not
- /// depending on whether or not the message should be received by the consumer.
- ///
- /// Any exceptions raised by the connection will cause an Assert failure exception to be raised.
- /// </summary>
- ///
- /// <param name="msgSend">The message to send.</param>
- /// <param name="shouldPass">A flag to indicate whether or not the message should be received by the consumer.</param>
- private void SendTestMessage(IMessage msgSend, bool shouldPass)
- {
- _publisher.Send(msgSend);
- _evt.WaitOne(TIMEOUT, true);
-
- // Check that an exception other than not routable was raised in which case re-raise it as a test error.
- if (_lastException != null && !(_lastException.InnerException is AMQUndeliveredException))
- {
- Assert.Fail("Exception {0} was raised by the broker connection.", _lastException);
- }
- // Check that a message was returned if the test is expecting the message to pass.
- else if (shouldPass)
- {
- Assert.IsNotNull(_lastMessage, "Did not get a matching message from the headers exchange.");
- }
- // Check that a not routable exception was raised if the test is expecting the message to fail.
- else if (_lastException != null && _lastException.InnerException is AMQUndeliveredException)
- {
- Assert.IsNull(_lastMessage, "Message could not be routed so consumer should not have received it.");
- }
- // The broker did not respond within the test timeout so fail the test.
- else
- {
- Assert.Fail("The test timed out without a response from the broker.");
- }
- }
-
- /// <summary> Returns a field table containing patterns to match the test header exchange against. </summary>
- ///
- /// <returns> A field table containing test patterns. </returns>
- private FieldTable CreatePatternAsFieldTable()
- {
- FieldTable matchTable = new FieldTable();
-
- matchTable["match1"] = "foo";
- matchTable["match2"] = "";
- matchTable["x-match"] = "all";
-
- return matchTable;
- }
- }
-}
+/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using System.Threading; +using log4net; +using NUnit.Framework; +using Apache.Qpid.Framing; +using Apache.Qpid.Messaging; +using Apache.Qpid.Client.Qms; +using Apache.Qpid.Client; + +namespace Apache.Qpid.Integration.Tests.testcases +{ + /// <summary> + /// Sets up a producer/consumer pair to send test messages through a header exchange. The header exchange matching pattern is tested to + /// verify that it correctly matches or filters out messages based on their headers. + /// + /// Check that a message matching all fields of a headers exchange is passed by the exchange. + /// Check that a message containing values for empty fields of a headers exchange is passed by the exchange. + /// Check that a message matching only some fields of a headers exhcnage is not passed by the exchange. + /// Check that a message with additional fields to the correct matching fields of a headers exchange is passed by the exchange. + /// </summary> + /// + /// <todo>Remove the HeadersMatchingProducer class and rename this to HeaderExchangeTest. The producer and consumer are implemented + /// in a single test class to make running this as part of an automated test suite possible.</todo> + /// + /// <todo>Consider not using a delegate to callback the OnMessage method. Easier to just call receive on the consumer but using the + /// callback does demonstrate how to do so.</todo> + [TestFixture, Category("Integration")] + public class HeadersExchangeTest : BaseMessagingTestFixture + { + private static ILog _logger = LogManager.GetLogger(typeof(HeadersExchangeTest)); + + /// <summary> Holds the default test timeout for broker communications before tests give up. </summary> + private static readonly int TIMEOUT = 2000; + + /// <summary> Holds the name of the headers exchange to create to send test messages on. </summary> + private string _exchangeName = "ServiceQ1"; + + /// <summary> Used to preserve the most recent exception in case test cases need to examine it. </summary> + private Exception _lastException = null; + + /// <summary> Used to preserve the most recent message from the test consumer. </summary> + private IMessage _lastMessage = null; + + /// <summary> The test consumer to get messages from the broker with. </summary> + private IMessageConsumer _consumer; + + private IMessagePublisher _publisher; + + private AutoResetEvent _evt = new AutoResetEvent(false); + + private MessageReceivedDelegate _msgRecDelegate; + private ExceptionListenerDelegate _exceptionDelegate; + + /// <summary> Holds the test connection. </summary> + protected IConnection _connection; + + /// <summary> Holds the test channel. </summary> + protected IChannel _channel; + + [SetUp] + public override void Init() + { + // Ensure that the base init method is called. It establishes a connection with the broker. + base.Init(); + + connectionInfo = QpidConnectionInfo.FromUrl(connectionUri); + _connection = new AMQConnection(connectionInfo); + _channel = _connection.CreateChannel(false, AcknowledgeMode.AutoAcknowledge, 500, 300); + + _logger.Info("Starting..."); + _logger.Info("Exchange name is '" + _exchangeName + "'..."); + + // Register this to listen for exceptions on the test connection. + _exceptionDelegate = new ExceptionListenerDelegate(OnException); + _connection.ExceptionListener += _exceptionDelegate; + + // Declare a new headers exchange with the name of the test service. + _channel.DeclareExchange(_exchangeName, ExchangeClassConstants.HEADERS); + + // Create a non-durable, temporary (aka auto-delete), exclusive queue. + string queueName = _channel.GenerateUniqueName(); + _channel.DeclareQueue(queueName, false, true, true); + + // Bind the queue to the new headers exchange, setting up some header patterns for the exchange to match. + _channel.Bind(queueName, _exchangeName, null, CreatePatternAsFieldTable()); + + // Create a test consumer to consume messages from the test exchange. + _consumer = _channel.CreateConsumerBuilder(queueName) + .WithPrefetchLow(100) + .WithPrefetchHigh(500) + .WithNoLocal(false) // make sure we get our own messages + .Create(); + + // Register this to listen for messages on the consumer. + _msgRecDelegate = new MessageReceivedDelegate(OnMessage); + _consumer.OnMessage += _msgRecDelegate; + + // Clear the most recent message and exception. + _lastException = null; + _lastMessage = null; + + _publisher = _channel.CreatePublisherBuilder() + .WithExchangeName(_exchangeName) + .WithMandatory(true) + .Create(); + + _publisher.DeliveryMode = DeliveryMode.NonPersistent; + + // Start all channel + _connection.Start(); + } + + /// <summary> + /// Deregisters the on message delegate before closing the connection. + /// </summary> + [TearDown] + public override void Shutdown() + { + _logger.Info("public void Shutdown(): called"); + + //_consumer.OnMessage -= _msgRecDelegate; + //_connection.ExceptionListener -= _exceptionDelegate; + + _connection.Stop(); + _connection.Close(); + _connection.Dispose(); + + base.Shutdown(); + } + + /// <summary> + /// Callback method that is passed any messages received on the test channel. + /// </summary> + /// + /// <param name="message">The received message.</param> + public void OnMessage(IMessage message) + { + _logger.Debug(string.Format("message.Type = {0}", message.GetType())); + _logger.Debug("Got message '" + message + "'"); + + // Preserve the most recent exception so that test cases can examine it. + _lastMessage = message; + + // Notify any waiting threads that a message has been received. + _evt.Set(); + } + + /// <summary>Callback method to handle any exceptions raised by the test connection.</summary> + /// + /// <param name="e">The connection exception.</param> + public void OnException(Exception e) + { + // Preserve the most recent exception in case test cases need to examine it. + _lastException = e; + + // Notify any waiting threads that an exception event has occurred. + _evt.Set(); + } + + /// <summary>Check that a message matching all fields of a headers exchange is passed by the exchange.</summary> + [Test] + public void TestMatchAll() + { + IMessage msg = _channel.CreateTextMessage("matches match2=''"); + msg.Headers["match1"] = "foo"; + msg.Headers["match2"] = ""; + + // Use the SendTestMessage helper method to verify that the message was sent and received. + SendTestMessage(msg, true); + } + + /// <summary>Check that a message containing values for empty fields of a headers exchange is passed by the exchange.</summary> + [Test] + public void TestMatchEmptyMatchesAnything() + { + // Send a test message that matches the headers exchange. + IMessage msg = _channel.CreateTextMessage("matches match1='foo' and match2='bar'"); + msg.Headers["match1"] = "foo"; + msg.Headers["match2"] = "bar"; + + // Use the SendTestMessage helper method to verify that the message was sent and received. + SendTestMessage(msg, true); + } + + /// <summary>Check that a message matching only some fields of a headers exchange is not passed by the exchange.</summary> + [Test] + public void TestMatchOneFails() + { + IMessage msg = _channel.CreateTextMessage("not match - only match1"); + msg.Headers["match1"] = "foo"; + + // Use the SendTestMessage helper method to verify that the message was sent and not received. + SendTestMessage(msg, false); + } + + /// <summary> + /// Check that a message with additional fields to the correct matching fields of a headers exchange is passed by + /// the exchange. + /// </summary> + [Test] + public void TestMatchExtraFields() + { + IMessage msg = _channel.CreateTextMessage("matches - extra headers"); + msg.Headers["match1"] = "foo"; + msg.Headers["match2"] = "bar"; + msg.Headers["match3"] = "not required"; + + // Use the SendTestMessage helper method to verify that the message was sent and received. + SendTestMessage(msg, true); + } + + /// <summary> + /// Sends the specified message to the test publisher, and confirms that it was received by the test consumer or not + /// depending on whether or not the message should be received by the consumer. + /// + /// Any exceptions raised by the connection will cause an Assert failure exception to be raised. + /// </summary> + /// + /// <param name="msgSend">The message to send.</param> + /// <param name="shouldPass">A flag to indicate whether or not the message should be received by the consumer.</param> + private void SendTestMessage(IMessage msgSend, bool shouldPass) + { + _publisher.Send(msgSend); + _evt.WaitOne(TIMEOUT, true); + + // Check that an exception other than not routable was raised in which case re-raise it as a test error. + if (_lastException != null && !(_lastException.InnerException is AMQUndeliveredException)) + { + Assert.Fail("Exception {0} was raised by the broker connection.", _lastException); + } + // Check that a message was returned if the test is expecting the message to pass. + else if (shouldPass) + { + Assert.IsNotNull(_lastMessage, "Did not get a matching message from the headers exchange."); + } + // Check that a not routable exception was raised if the test is expecting the message to fail. + else if (_lastException != null && _lastException.InnerException is AMQUndeliveredException) + { + Assert.IsNull(_lastMessage, "Message could not be routed so consumer should not have received it."); + } + // The broker did not respond within the test timeout so fail the test. + else + { + Assert.Fail("The test timed out without a response from the broker."); + } + } + + /// <summary> Returns a field table containing patterns to match the test header exchange against. </summary> + /// + /// <returns> A field table containing test patterns. </returns> + private FieldTable CreatePatternAsFieldTable() + { + FieldTable matchTable = new FieldTable(); + + matchTable["match1"] = "foo"; + matchTable["match2"] = ""; + matchTable["x-match"] = "all"; + + return matchTable; + } + } +} diff --git a/dotnet/Qpid.Integration.Tests/testcases/MandatoryMessageTest.cs b/dotnet/Qpid.Integration.Tests/testcases/MandatoryMessageTest.cs index 6cfdad1f94..4abc56905f 100644 --- a/dotnet/Qpid.Integration.Tests/testcases/MandatoryMessageTest.cs +++ b/dotnet/Qpid.Integration.Tests/testcases/MandatoryMessageTest.cs @@ -1,149 +1,149 @@ -/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-using System;
-using System.Threading;
-using log4net;
-using NUnit.Framework;
-using Apache.Qpid.Messaging;
-using Apache.Qpid.Client.Qms;
-using Apache.Qpid.Client;
-
-namespace Apache.Qpid.Integration.Tests.testcases
-{
- /// <summary>
- /// MandatoryMessageTest checks that messages sent with the 'mandatory' flag, must either be routed to a valid
- /// queue or returned to the sender when no route is available.
- ///
- /// <p><table id="crc"><caption>CRC Card</caption>
- /// <tr><th> Responsibilities <th> Collaborations
- /// <tr><td> Check default exchange returns unroutable mandatory messages.
- /// <tr><td> Check direct exchange returns unroutable mandatory messages.
- /// <tr><td> Check headers exchange returns unroutable mandatory messages.
- /// <tr><td> Check topic exchange returns unroutable mandatory messages.
- /// </table>
- /// </summary>
- [TestFixture, Category("Integration")]
- public class MandatoryMessageTest : BaseMessagingTestFixture
- {
- /// <summary>Used for debugging purposes.</summary>
- private static ILog log = LogManager.GetLogger(typeof(MandatoryMessageTest));
-
- /// <summary>Defines the maximum time in milliseconds, to wait for redelivery to occurr.</summary>
- public const int TIMEOUT = 1000;
-
- /// <summary>Defines the name of the routing key to use with the tests.</summary>
- public const string TEST_ROUTING_KEY = "unboundkey";
-
- /// <summary>Condition used to coordinate receipt of redelivery exception to the sending thread.</summary>
- private ManualResetEvent errorEvent;
-
- /// <summary>Holds the last received error condition, for examination by the tests sending thread.</summary>
- private Exception lastErrorException;
-
- /// <summary> Holds the test connection. </summary>
- protected IConnection _connection;
-
- /// <summary> Holds the test channel. </summary>
- protected IChannel _channel;
-
- [SetUp]
- public override void Init()
- {
- base.Init();
-
- errorEvent = new ManualResetEvent(false);
- lastErrorException = null;
- }
-
- [TearDown]
- public override void Shutdown()
- {
- base.Shutdown();
- }
-
- /// <summary>
- /// Handles all exception conditions on the connection. The error event is notified and the exception recorded as the last seen.
- /// </summary>
- ///
- /// <param name="e">The asynchronous exception on the connection.</param>
- public void OnException(Exception e)
- {
- lastErrorException = e;
- errorEvent.Set();
- }
-
- [Test]
- public void SendUndeliverableMessageOnDirectExchange()
- {
- SendOne(ExchangeNameDefaults.DIRECT);
- }
-
- [Test]
- public void SendUndeliverableMessageOnTopicExchange()
- {
- SendOne(ExchangeNameDefaults.TOPIC);
- }
-
- [Test]
- public void SendUndeliverableMessageOnHeadersExchange()
- {
- SendOne(ExchangeNameDefaults.HEADERS);
- }
-
- /// <summary>
- /// Sends a single message to the specified exchange with the routing key 'unboundkey', marked as mandatory.
- /// A check is performed to assert that a redelivery error is returned from the broker for the message.
- /// </summary>
- ///
- /// <param name="exchangeName">The name of the exchange to send to.</param>
- private void SendOne(string exchangeName)
- {
- log.Debug("private void SendOne(string exchangeName = " + exchangeName + "): called");
-
- // Send a test message to a unbound key on the specified exchange.
- SetUpEndPoint(0, false, false, TEST_ROUTING_KEY + testId, AcknowledgeMode.AutoAcknowledge, false, exchangeName,
- true, false, null);
- testProducer[0] = testChannel[0].CreatePublisherBuilder()
- .WithRoutingKey(TEST_ROUTING_KEY + testId)
- .WithMandatory(true)
- .WithExchangeName(exchangeName)
- .Create();
-
- // Set up the exception listener on the connection.
- testConnection[0].ExceptionListener = new ExceptionListenerDelegate(OnException);
-
- // Send message that should fail.
- testProducer[0].Send(testChannel[0].CreateTextMessage("Test Message"));
-
- // Wait for up to the timeout for a redelivery exception to be returned.
- errorEvent.WaitOne(TIMEOUT, true);
-
- // Asserts that a redelivery exception was returned, and is of the correct type.
- Type expectedException = typeof(AMQUndeliveredException);
- Exception ex = lastErrorException;
-
- Assert.IsNotNull(ex, "No exception was thrown by the test. Expected " + expectedException);
- Assert.IsInstanceOfType(expectedException, ex.InnerException);
-
- CloseEndPoint(0);
- }
- }
-}
+/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using System.Threading; +using log4net; +using NUnit.Framework; +using Apache.Qpid.Messaging; +using Apache.Qpid.Client.Qms; +using Apache.Qpid.Client; + +namespace Apache.Qpid.Integration.Tests.testcases +{ + /// <summary> + /// MandatoryMessageTest checks that messages sent with the 'mandatory' flag, must either be routed to a valid + /// queue or returned to the sender when no route is available. + /// + /// <p><table id="crc"><caption>CRC Card</caption> + /// <tr><th> Responsibilities <th> Collaborations + /// <tr><td> Check default exchange returns unroutable mandatory messages. + /// <tr><td> Check direct exchange returns unroutable mandatory messages. + /// <tr><td> Check headers exchange returns unroutable mandatory messages. + /// <tr><td> Check topic exchange returns unroutable mandatory messages. + /// </table> + /// </summary> + [TestFixture, Category("Integration")] + public class MandatoryMessageTest : BaseMessagingTestFixture + { + /// <summary>Used for debugging purposes.</summary> + private static ILog log = LogManager.GetLogger(typeof(MandatoryMessageTest)); + + /// <summary>Defines the maximum time in milliseconds, to wait for redelivery to occurr.</summary> + public const int TIMEOUT = 1000; + + /// <summary>Defines the name of the routing key to use with the tests.</summary> + public const string TEST_ROUTING_KEY = "unboundkey"; + + /// <summary>Condition used to coordinate receipt of redelivery exception to the sending thread.</summary> + private ManualResetEvent errorEvent; + + /// <summary>Holds the last received error condition, for examination by the tests sending thread.</summary> + private Exception lastErrorException; + + /// <summary> Holds the test connection. </summary> + protected IConnection _connection; + + /// <summary> Holds the test channel. </summary> + protected IChannel _channel; + + [SetUp] + public override void Init() + { + base.Init(); + + errorEvent = new ManualResetEvent(false); + lastErrorException = null; + } + + [TearDown] + public override void Shutdown() + { + base.Shutdown(); + } + + /// <summary> + /// Handles all exception conditions on the connection. The error event is notified and the exception recorded as the last seen. + /// </summary> + /// + /// <param name="e">The asynchronous exception on the connection.</param> + public void OnException(Exception e) + { + lastErrorException = e; + errorEvent.Set(); + } + + [Test] + public void SendUndeliverableMessageOnDirectExchange() + { + SendOne(ExchangeNameDefaults.DIRECT); + } + + [Test] + public void SendUndeliverableMessageOnTopicExchange() + { + SendOne(ExchangeNameDefaults.TOPIC); + } + + [Test] + public void SendUndeliverableMessageOnHeadersExchange() + { + SendOne(ExchangeNameDefaults.HEADERS); + } + + /// <summary> + /// Sends a single message to the specified exchange with the routing key 'unboundkey', marked as mandatory. + /// A check is performed to assert that a redelivery error is returned from the broker for the message. + /// </summary> + /// + /// <param name="exchangeName">The name of the exchange to send to.</param> + private void SendOne(string exchangeName) + { + log.Debug("private void SendOne(string exchangeName = " + exchangeName + "): called"); + + // Send a test message to a unbound key on the specified exchange. + SetUpEndPoint(0, false, false, TEST_ROUTING_KEY + testId, AcknowledgeMode.AutoAcknowledge, false, exchangeName, + true, false, null); + testProducer[0] = testChannel[0].CreatePublisherBuilder() + .WithRoutingKey(TEST_ROUTING_KEY + testId) + .WithMandatory(true) + .WithExchangeName(exchangeName) + .Create(); + + // Set up the exception listener on the connection. + testConnection[0].ExceptionListener = new ExceptionListenerDelegate(OnException); + + // Send message that should fail. + testProducer[0].Send(testChannel[0].CreateTextMessage("Test Message")); + + // Wait for up to the timeout for a redelivery exception to be returned. + errorEvent.WaitOne(TIMEOUT, true); + + // Asserts that a redelivery exception was returned, and is of the correct type. + Type expectedException = typeof(AMQUndeliveredException); + Exception ex = lastErrorException; + + Assert.IsNotNull(ex, "No exception was thrown by the test. Expected " + expectedException); + Assert.IsInstanceOfType(expectedException, ex.InnerException); + + CloseEndPoint(0); + } + } +} diff --git a/dotnet/Qpid.Integration.Tests/testcases/ProducerMultiConsumerTest.cs b/dotnet/Qpid.Integration.Tests/testcases/ProducerMultiConsumerTest.cs index 876e7c7bf7..bae6c76818 100644 --- a/dotnet/Qpid.Integration.Tests/testcases/ProducerMultiConsumerTest.cs +++ b/dotnet/Qpid.Integration.Tests/testcases/ProducerMultiConsumerTest.cs @@ -1,167 +1,167 @@ -/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-using System;
-using System.Text;
-using System.Threading;
-using log4net;
-using NUnit.Framework;
-using Apache.Qpid.Messaging;
-using Apache.Qpid.Client.Qms;
-using Apache.Qpid.Client;
-
-namespace Apache.Qpid.Integration.Tests.testcases
-{
- /// ProducerMultiConsumerTest provides some tests for one producer and multiple consumers.
- ///
- /// <p><table id="crc"><caption>CRC Card</caption>
- /// <tr><th> Responsibilities <th> Collaborations
- /// <tr><td> Check that all consumers on a topic each receive all message on it.
- /// <tr><td> Check that consumers on the same queue receive each message once accross all consumers.
- /// </table>
- /// </summary>
- [TestFixture, Category("Integration")]
- public class ProducerMultiConsumerTest : BaseMessagingTestFixture
- {
- private static readonly ILog _logger = LogManager.GetLogger(typeof(ProducerMultiConsumerTest));
-
- /// <summary>Base name for the routing key used for this test (made unique by adding in test id).</summary>
- private const string TEST_ROUTING_KEY = "ProducerMultiConsumerTest";
-
- /// <summary>The number of consumers to test.</summary>
- private const int CONSUMER_COUNT = 5;
-
- /// <summary>The number of test messages to send.</summary>
- private const int MESSAGE_COUNT = 10;
-
- /// <summary>Monitor used to signal succesfull receipt of all test messages.</summary>
- AutoResetEvent _finishedEvent;
-
- /// <summary>Used to count test messages received so far.</summary>
- private int _messageReceivedCount;
-
- /// <summary>Used to hold the expected number of messages to receive.</summary>
- private int expectedMessageCount;
-
- /// <summary>Flag used to indicate that all messages really were received, and that the test did not just time out. </summary>
- private bool allReceived;
-
- /// <summary> Creates one producing end-point and many consuming end-points connected on a topic. </summary>
- [SetUp]
- public override void Init()
- {
- base.Init();
-
- // Reset all test counts and flags.
- _messageReceivedCount = 0;
- allReceived = false;
- _finishedEvent = new AutoResetEvent(false);
- }
-
- /// <summary> Cleans up all test end-points. </summary>
- [TearDown]
- public override void Shutdown()
- {
- try
- {
- // Close all end points for producer and consumers.
- // Producer is on 0, and consumers on 1 .. n, so loop is from 0 to n inclusive.
- for (int i = 0; i <= CONSUMER_COUNT; i++)
- {
- CloseEndPoint(i);
- }
- }
- finally
- {
- base.Shutdown();
- }
- }
-
- /// <summary> Check that all consumers on a topic each receive all message on it. </summary>
- [Test]
- public void AllConsumerReceiveAllMessagesOnTopic()
- {
- // Create end-points for all the consumers in the test.
- for (int i = 1; i <= CONSUMER_COUNT; i++)
- {
- SetUpEndPoint(i, false, true, TEST_ROUTING_KEY + testId, AcknowledgeMode.AutoAcknowledge, false, ExchangeNameDefaults.TOPIC,
- true, false, null);
- testConsumer[i].OnMessage += new MessageReceivedDelegate(OnMessage);
- }
-
- // Create an end-point to publish to the test topic.
- SetUpEndPoint(0, true, false, TEST_ROUTING_KEY + testId, AcknowledgeMode.AutoAcknowledge, false, ExchangeNameDefaults.TOPIC,
- true, false, null);
-
- expectedMessageCount = (MESSAGE_COUNT * CONSUMER_COUNT);
-
- for (int i = 0; i < MESSAGE_COUNT; i++)
- {
- testProducer[0].Send(testChannel[0].CreateTextMessage("A"));
- }
-
- _finishedEvent.WaitOne(new TimeSpan(0, 0, 0, 30), false);
-
- // Check that all messages really were received.
- Assert.IsTrue(allReceived, "All messages were not received, only got " + _messageReceivedCount + " but wanted " + expectedMessageCount);
- }
-
- /// <summary> Check that consumers on the same queue receive each message once accross all consumers. </summary>
- [Test]
- public void AllConsumerReceiveAllMessagesOnDirect()
- {
- // Create end-points for all the consumers in the test.
- for (int i = 1; i <= CONSUMER_COUNT; i++)
- {
- SetUpEndPoint(i, false, true, TEST_ROUTING_KEY + testId, AcknowledgeMode.AutoAcknowledge, false, ExchangeNameDefaults.DIRECT,
- true, false, null);
- testConsumer[i].OnMessage += new MessageReceivedDelegate(OnMessage);
- }
-
- // Create an end-point to publish to the test topic.
- SetUpEndPoint(0, true, false, TEST_ROUTING_KEY + testId, AcknowledgeMode.AutoAcknowledge, false, ExchangeNameDefaults.DIRECT,
- true, false, null);
-
- expectedMessageCount = (MESSAGE_COUNT * CONSUMER_COUNT);
-
- for (int i = 0; i < MESSAGE_COUNT; i++)
- {
- testProducer[0].Send(testChannel[0].CreateTextMessage("A"));
- }
-
- _finishedEvent.WaitOne(new TimeSpan(0, 0, 0, 30), false);
-
- // Check that all messages really were received.
- Assert.IsTrue(allReceived, "All messages were not received, only got: " + _messageReceivedCount + " but wanted " + expectedMessageCount);
- }
-
- /// <summary> Atomically increments the message count on every message, and signals once all messages in the test are received. </summary>
- public void OnMessage(IMessage m)
- {
- int newCount = Interlocked.Increment(ref _messageReceivedCount);
-
- if (newCount >= expectedMessageCount)
- {
- allReceived = true;
- _finishedEvent.Set();
- }
- }
- }
-}
+/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using System.Text; +using System.Threading; +using log4net; +using NUnit.Framework; +using Apache.Qpid.Messaging; +using Apache.Qpid.Client.Qms; +using Apache.Qpid.Client; + +namespace Apache.Qpid.Integration.Tests.testcases +{ + /// ProducerMultiConsumerTest provides some tests for one producer and multiple consumers. + /// + /// <p><table id="crc"><caption>CRC Card</caption> + /// <tr><th> Responsibilities <th> Collaborations + /// <tr><td> Check that all consumers on a topic each receive all message on it. + /// <tr><td> Check that consumers on the same queue receive each message once accross all consumers. + /// </table> + /// </summary> + [TestFixture, Category("Integration")] + public class ProducerMultiConsumerTest : BaseMessagingTestFixture + { + private static readonly ILog _logger = LogManager.GetLogger(typeof(ProducerMultiConsumerTest)); + + /// <summary>Base name for the routing key used for this test (made unique by adding in test id).</summary> + private const string TEST_ROUTING_KEY = "ProducerMultiConsumerTest"; + + /// <summary>The number of consumers to test.</summary> + private const int CONSUMER_COUNT = 5; + + /// <summary>The number of test messages to send.</summary> + private const int MESSAGE_COUNT = 10; + + /// <summary>Monitor used to signal succesfull receipt of all test messages.</summary> + AutoResetEvent _finishedEvent; + + /// <summary>Used to count test messages received so far.</summary> + private int _messageReceivedCount; + + /// <summary>Used to hold the expected number of messages to receive.</summary> + private int expectedMessageCount; + + /// <summary>Flag used to indicate that all messages really were received, and that the test did not just time out. </summary> + private bool allReceived; + + /// <summary> Creates one producing end-point and many consuming end-points connected on a topic. </summary> + [SetUp] + public override void Init() + { + base.Init(); + + // Reset all test counts and flags. + _messageReceivedCount = 0; + allReceived = false; + _finishedEvent = new AutoResetEvent(false); + } + + /// <summary> Cleans up all test end-points. </summary> + [TearDown] + public override void Shutdown() + { + try + { + // Close all end points for producer and consumers. + // Producer is on 0, and consumers on 1 .. n, so loop is from 0 to n inclusive. + for (int i = 0; i <= CONSUMER_COUNT; i++) + { + CloseEndPoint(i); + } + } + finally + { + base.Shutdown(); + } + } + + /// <summary> Check that all consumers on a topic each receive all message on it. </summary> + [Test] + public void AllConsumerReceiveAllMessagesOnTopic() + { + // Create end-points for all the consumers in the test. + for (int i = 1; i <= CONSUMER_COUNT; i++) + { + SetUpEndPoint(i, false, true, TEST_ROUTING_KEY + testId, AcknowledgeMode.AutoAcknowledge, false, ExchangeNameDefaults.TOPIC, + true, false, null); + testConsumer[i].OnMessage += new MessageReceivedDelegate(OnMessage); + } + + // Create an end-point to publish to the test topic. + SetUpEndPoint(0, true, false, TEST_ROUTING_KEY + testId, AcknowledgeMode.AutoAcknowledge, false, ExchangeNameDefaults.TOPIC, + true, false, null); + + expectedMessageCount = (MESSAGE_COUNT * CONSUMER_COUNT); + + for (int i = 0; i < MESSAGE_COUNT; i++) + { + testProducer[0].Send(testChannel[0].CreateTextMessage("A")); + } + + _finishedEvent.WaitOne(new TimeSpan(0, 0, 0, 30), false); + + // Check that all messages really were received. + Assert.IsTrue(allReceived, "All messages were not received, only got " + _messageReceivedCount + " but wanted " + expectedMessageCount); + } + + /// <summary> Check that consumers on the same queue receive each message once accross all consumers. </summary> + [Test] + public void AllConsumerReceiveAllMessagesOnDirect() + { + // Create end-points for all the consumers in the test. + for (int i = 1; i <= CONSUMER_COUNT; i++) + { + SetUpEndPoint(i, false, true, TEST_ROUTING_KEY + testId, AcknowledgeMode.AutoAcknowledge, false, ExchangeNameDefaults.DIRECT, + true, false, null); + testConsumer[i].OnMessage += new MessageReceivedDelegate(OnMessage); + } + + // Create an end-point to publish to the test topic. + SetUpEndPoint(0, true, false, TEST_ROUTING_KEY + testId, AcknowledgeMode.AutoAcknowledge, false, ExchangeNameDefaults.DIRECT, + true, false, null); + + expectedMessageCount = (MESSAGE_COUNT * CONSUMER_COUNT); + + for (int i = 0; i < MESSAGE_COUNT; i++) + { + testProducer[0].Send(testChannel[0].CreateTextMessage("A")); + } + + _finishedEvent.WaitOne(new TimeSpan(0, 0, 0, 30), false); + + // Check that all messages really were received. + Assert.IsTrue(allReceived, "All messages were not received, only got: " + _messageReceivedCount + " but wanted " + expectedMessageCount); + } + + /// <summary> Atomically increments the message count on every message, and signals once all messages in the test are received. </summary> + public void OnMessage(IMessage m) + { + int newCount = Interlocked.Increment(ref _messageReceivedCount); + + if (newCount >= expectedMessageCount) + { + allReceived = true; + _finishedEvent.Set(); + } + } + } +} diff --git a/dotnet/Qpid.Integration.Tests/testcases/SslConnectionTest.cs b/dotnet/Qpid.Integration.Tests/testcases/SslConnectionTest.cs index 1c104d1451..5f953e1470 100644 --- a/dotnet/Qpid.Integration.Tests/testcases/SslConnectionTest.cs +++ b/dotnet/Qpid.Integration.Tests/testcases/SslConnectionTest.cs @@ -1,64 +1,64 @@ -/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-using System;
-using System.IO;
-using System.Reflection;
-using System.Security.Cryptography.X509Certificates;
-using NUnit.Framework;
-using Apache.Qpid.Client.Qms;
-using Apache.Qpid.Client;
-using Apache.Qpid.Messaging;
-
-namespace Apache.Qpid.Integration.Tests.testcases
-{
- /// <summary>
- /// Test SSL/TLS connections to the broker
- /// </summary>
- [TestFixture, Category("Integration")]
- public class SslConnectionTest
- {
- /// <summary>
- /// Make a test TLS connection to the broker
- /// without using client-certificates
- /// </summary>
- //[Test]
- public void DoSslConnection()
- {
- // because for tests we don't usually trust the server certificate
- // we need here to tell the client to ignore certificate validation errors
- SslOptions sslConfig = new SslOptions(null, true);
-
- MakeBrokerConnection(sslConfig);
- }
-
- private static void MakeBrokerConnection(SslOptions options)
- {
- IConnectionInfo connectionInfo = new QpidConnectionInfo();
- connectionInfo.VirtualHost = "test";
- connectionInfo.AddBrokerInfo(new AmqBrokerInfo("amqp", "localhost", 8672, options));
-
- using ( IConnection connection = new AMQConnection(connectionInfo) )
- {
- Console.WriteLine("connection = " + connection);
- }
- }
- }
-}
+/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using System.IO; +using System.Reflection; +using System.Security.Cryptography.X509Certificates; +using NUnit.Framework; +using Apache.Qpid.Client.Qms; +using Apache.Qpid.Client; +using Apache.Qpid.Messaging; + +namespace Apache.Qpid.Integration.Tests.testcases +{ + /// <summary> + /// Test SSL/TLS connections to the broker + /// </summary> + [TestFixture, Category("Integration")] + public class SslConnectionTest + { + /// <summary> + /// Make a test TLS connection to the broker + /// without using client-certificates + /// </summary> + //[Test] + public void DoSslConnection() + { + // because for tests we don't usually trust the server certificate + // we need here to tell the client to ignore certificate validation errors + SslOptions sslConfig = new SslOptions(null, true); + + MakeBrokerConnection(sslConfig); + } + + private static void MakeBrokerConnection(SslOptions options) + { + IConnectionInfo connectionInfo = new QpidConnectionInfo(); + connectionInfo.VirtualHost = "test"; + connectionInfo.AddBrokerInfo(new AmqBrokerInfo("amqp", "localhost", 8672, options)); + + using ( IConnection connection = new AMQConnection(connectionInfo) ) + { + Console.WriteLine("connection = " + connection); + } + } + } +} diff --git a/dotnet/Qpid.Integration.Tests/testcases/SustainedTest.cs b/dotnet/Qpid.Integration.Tests/testcases/SustainedTest.cs index aca3f396d6..4074055eba 100644 --- a/dotnet/Qpid.Integration.Tests/testcases/SustainedTest.cs +++ b/dotnet/Qpid.Integration.Tests/testcases/SustainedTest.cs @@ -1,109 +1,109 @@ -/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-using System;
-using System.IO;
-using System.Reflection;
-using System.Threading;
-using NUnit.Framework;
-using Apache.Qpid.Client.Qms;
-using Apache.Qpid.Client;
-using Apache.Qpid.Messaging;
-using log4net;
-
-namespace Apache.Qpid.Integration.Tests.testcases
-{
- /// <summary>
- /// Runs through the range of ack modes for each test case, sending and recieving a large number of messages
- /// </summary>
- [TestFixture, Category("Integration")]
- public class SustainedTest : BaseMessagingTestFixture
- {
- /// <summary>The number of test messages to send.</summary>
- private const int MESSAGE_COUNT = 50;//00;
-
- /// <summary>Base name for the routing key used for this test (made unique by adding in test id).</summary>
- private const string TEST_ROUTING_KEY = "MessageOrderTest";
-
- /// <summary>
- /// The logger
- /// </summary>
- private static ILog _logger = LogManager.GetLogger(typeof(SustainedTest));
-
- [Test]
- public void MessageOrderTestAutoAck()
- {
- MessageOrderTest(AcknowledgeMode.AutoAcknowledge);
- }
-
- [Test]
- public void MessageOrderTestNoAck()
- {
- MessageOrderTest(AcknowledgeMode.NoAcknowledge);
- }
-
- public void MessageOrderTest(AcknowledgeMode consumerMode)
- {
-
- // Consumer
- SetUpEndPoint(1, false, true, TEST_ROUTING_KEY, consumerMode, false, ExchangeNameDefaults.DIRECT,
- true, false, null);
-
-
- Console.WriteLine("Starting producer thread");
- Thread prodThread = new Thread(new ThreadStart(SendMessages));
- prodThread.Start();
-
- Thread.Sleep(2000);
- Console.WriteLine("Starting consuming");
- for (int i = 0; i < MESSAGE_COUNT; i++)
- {
- if ((i % 10) == 0)
- {
- Console.WriteLine("Consuming message "+i);
- }
- ConsumeNMessages(1, "Msg"+i, testConsumer[1]);
- }
- prodThread.Join();
- CloseEndPoint(0);
- CloseEndPoint(1);
- }
-
- private static void SendMessages()
- {
- AMQConnection conn = new AMQConnection(QpidConnectionInfo.FromUrl(BaseMessagingTestFixture.connectionUri));
- conn.Start();
- IChannel channel = conn.CreateChannel(false, AcknowledgeMode.AutoAcknowledge);
- IMessagePublisher producer = channel.CreatePublisherBuilder().
- WithExchangeName(ExchangeNameDefaults.DIRECT).
- WithRoutingKey(TEST_ROUTING_KEY).
- Create();
-
- for (int i = 0; i < MESSAGE_COUNT ; i++)
- {
- if ((i % 10) == 0)
- {
- Console.WriteLine("Sending message "+i);
- }
- producer.Send(channel.CreateTextMessage("Msg" + i));
- }
- }
- }
-}
+/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using System.IO; +using System.Reflection; +using System.Threading; +using NUnit.Framework; +using Apache.Qpid.Client.Qms; +using Apache.Qpid.Client; +using Apache.Qpid.Messaging; +using log4net; + +namespace Apache.Qpid.Integration.Tests.testcases +{ + /// <summary> + /// Runs through the range of ack modes for each test case, sending and recieving a large number of messages + /// </summary> + [TestFixture, Category("Integration")] + public class SustainedTest : BaseMessagingTestFixture + { + /// <summary>The number of test messages to send.</summary> + private const int MESSAGE_COUNT = 50;//00; + + /// <summary>Base name for the routing key used for this test (made unique by adding in test id).</summary> + private const string TEST_ROUTING_KEY = "MessageOrderTest"; + + /// <summary> + /// The logger + /// </summary> + private static ILog _logger = LogManager.GetLogger(typeof(SustainedTest)); + + [Test] + public void MessageOrderTestAutoAck() + { + MessageOrderTest(AcknowledgeMode.AutoAcknowledge); + } + + [Test] + public void MessageOrderTestNoAck() + { + MessageOrderTest(AcknowledgeMode.NoAcknowledge); + } + + public void MessageOrderTest(AcknowledgeMode consumerMode) + { + + // Consumer + SetUpEndPoint(1, false, true, TEST_ROUTING_KEY, consumerMode, false, ExchangeNameDefaults.DIRECT, + true, false, null); + + + Console.WriteLine("Starting producer thread"); + Thread prodThread = new Thread(new ThreadStart(SendMessages)); + prodThread.Start(); + + Thread.Sleep(2000); + Console.WriteLine("Starting consuming"); + for (int i = 0; i < MESSAGE_COUNT; i++) + { + if ((i % 10) == 0) + { + Console.WriteLine("Consuming message "+i); + } + ConsumeNMessages(1, "Msg"+i, testConsumer[1]); + } + prodThread.Join(); + CloseEndPoint(0); + CloseEndPoint(1); + } + + private static void SendMessages() + { + AMQConnection conn = new AMQConnection(QpidConnectionInfo.FromUrl(BaseMessagingTestFixture.connectionUri)); + conn.Start(); + IChannel channel = conn.CreateChannel(false, AcknowledgeMode.AutoAcknowledge); + IMessagePublisher producer = channel.CreatePublisherBuilder(). + WithExchangeName(ExchangeNameDefaults.DIRECT). + WithRoutingKey(TEST_ROUTING_KEY). + Create(); + + for (int i = 0; i < MESSAGE_COUNT ; i++) + { + if ((i % 10) == 0) + { + Console.WriteLine("Sending message "+i); + } + producer.Send(channel.CreateTextMessage("Msg" + i)); + } + } + } +} |
