diff options
author | Stephen D. Huston <shuston@apache.org> | 2010-04-27 22:10:20 +0000 |
---|---|---|
committer | Stephen D. Huston <shuston@apache.org> | 2010-04-27 22:10:20 +0000 |
commit | 9cba4c65c2be490a61a324431bacfbb79907ca60 (patch) | |
tree | 5c9b5b9943ec169a139ad03e4fc85a52c199a9ee /wcf | |
parent | e8235b0d5eaf981a39d71e2df0cf11cf854bd9fe (diff) | |
download | qpid-python-9cba4c65c2be490a61a324431bacfbb79907ca60.tar.gz |
Applied patch from QPID-2359 adding functional tests for the transactional AMQP WCF channel.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@938695 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'wcf')
15 files changed, 1141 insertions, 17 deletions
diff --git a/wcf/test/Apache/Qpid/Test/Channel/Functional/AsyncTest.cs b/wcf/test/Apache/Qpid/Test/Channel/Functional/AsyncTest.cs index bf20b5083d..23bed6c603 100644 --- a/wcf/test/Apache/Qpid/Test/Channel/Functional/AsyncTest.cs +++ b/wcf/test/Apache/Qpid/Test/Channel/Functional/AsyncTest.cs @@ -175,10 +175,10 @@ namespace Apache.Qpid.Test.Channel.Functional Thread.Sleep(TimeSpan.FromMilliseconds(300.0)); // Dummy work inputChannel = listener.EndAcceptChannel(acceptResult); } - catch (TimeoutException e) + catch (TimeoutException) { listener.Close(); - throw e; + throw; } finally { diff --git a/wcf/test/Apache/Qpid/Test/Channel/Functional/BasicTransactionTest.cs b/wcf/test/Apache/Qpid/Test/Channel/Functional/BasicTransactionTest.cs new file mode 100644 index 0000000000..fa3b79d3a7 --- /dev/null +++ b/wcf/test/Apache/Qpid/Test/Channel/Functional/BasicTransactionTest.cs @@ -0,0 +1,173 @@ +/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+namespace Apache.Qpid.Test.Channel.Functional
+{
+ using System;
+ using System.Collections.Generic;
+ using System.Reflection;
+ using System.ServiceModel;
+ using System.Threading;
+ using NUnit.Framework;
+
+ [TestFixture]
+ public class BasicTransactionTest
+ {
+ private const string SendToUri = "amqp:amq.direct?routingkey=routing_key";
+ private const string ListenUri = "amqp:message_queue";
+
+ private MessageClient client;
+
+ [SetUp]
+ public void Setup()
+ {
+ // Create client
+ this.client = new MessageClient();
+ this.client.NumberOfMessages = 3;
+ this.client.NumberOfIterations = 1;
+
+ // Setup service
+ MessageService.EndpointAddress = ListenUri;
+ MessageService.ContractTypes = new List<Type>();
+ MessageService.CompletionHandle = new EventWaitHandle(false, EventResetMode.AutoReset);
+ }
+
+ [TestCase(true)]
+ [TestCase(false)]
+ public void TransactionalSend(bool commitTransaction)
+ {
+ int expectedMessageCount = 0;
+ this.client.TransactedSend = true;
+
+ MessageService.ContractTypes.Add(typeof(IQueuedServiceUsingTSRAttribute));
+ this.client.CommitTransaction = commitTransaction;
+
+ if (commitTransaction)
+ {
+ expectedMessageCount = this.client.NumberOfIterations * this.client.NumberOfMessages * MessageService.ContractTypes.Count;
+ }
+
+ // Call Service methods.
+ this.SendMessages(String.Empty);
+
+ // Validate results.
+ int actualMessageCount = Util.GetMessageCountFromQueue(ListenUri);
+ Assert.AreEqual(expectedMessageCount, actualMessageCount, "The actual message count wasn't as expected.");
+ }
+
+ [TestCase("UseTransactionScope", true)]
+ [TestCase("UseTransactionScope", false)]
+ [TestCase("UseTSRAttribute", true)]
+ [TestCase("UseTSRAttribute", false)]
+ public void TransactionalReceive(string testVariation, bool commitTransaction)
+ {
+ bool testPassed = true;
+ int expectedMessageCount = 0;
+ this.client.TransactedSend = false;
+ string transactionOutcome = "Commit";
+
+ switch (testVariation.Trim().ToLower())
+ {
+ case "usetransactionscope":
+ {
+ MessageService.ContractTypes.Add(typeof(IQueuedServiceUsingTransactionScope));
+ }
+
+ break;
+ case "usetsrattribute":
+ {
+ MessageService.ContractTypes.Add(typeof(IQueuedServiceUsingTSRAttribute));
+ }
+
+ break;
+ }
+
+ int expectedMethodCallCount = this.client.NumberOfIterations * this.client.NumberOfMessages * MessageService.ContractTypes.Count;
+
+ if (!commitTransaction)
+ {
+ expectedMessageCount = expectedMethodCallCount;
+ transactionOutcome = "Abort";
+ }
+
+ MessageService.IntendedInvocationCount = expectedMethodCallCount;
+
+ // Start the service.
+ MessageService.StartService(Util.GetBinding());
+
+ // Call Service methods.
+ this.SendMessages(transactionOutcome);
+
+ // Allow the wcf service to process all the messages before validation.
+ if (!MessageService.CompletionHandle.WaitOne(TimeSpan.FromSeconds(10.0), false))
+ {
+ Console.WriteLine("The service did not finish processing messages in 10 seconds. Test will be FAILED");
+ testPassed = false;
+ }
+
+ MessageService.StopService();
+
+ // Validate results.
+ if (expectedMethodCallCount > MessageService.TotalMethodCallCount)
+ {
+ Console.WriteLine("The expected method call count was {0} but got {1}.", expectedMethodCallCount, MessageService.TotalMethodCallCount);
+ testPassed = false;
+ }
+
+ int actualMessageCount = Util.GetMessageCountFromQueue(ListenUri);
+ if (expectedMessageCount != actualMessageCount)
+ {
+ Console.WriteLine("The expected message count was {0} but got {1}.", expectedMessageCount, actualMessageCount);
+ testPassed = false;
+ }
+
+ Assert.AreEqual(true, testPassed, "Results not as expected. Testcase FAILED.");
+
+ }
+
+ [TearDown]
+ public void Cleanup()
+ {
+ if (MessageService.IsServiceRunning())
+ {
+ MessageService.StopService();
+ }
+ }
+
+ private void SendMessages(string messageString)
+ {
+ // Create messages to send.
+ string[] messages = new string[this.client.NumberOfMessages];
+ for (int i = 0; i < this.client.NumberOfMessages; ++i)
+ {
+ messages[i] = messageString + " Message " + i;
+ }
+
+ // Create Amqpchannel and send messages.
+ MethodInfo runClientMethod = this.client.GetType().GetMethod("RunTestClient");
+ EndpointAddress address = new EndpointAddress(SendToUri);
+
+ foreach (Type contractType in MessageService.ContractTypes)
+ {
+ MethodInfo runClientT = runClientMethod.MakeGenericMethod(contractType);
+ runClientT.Invoke(this.client, new object[] { address, messages });
+ }
+ }
+ }
+}
diff --git a/wcf/test/Apache/Qpid/Test/Channel/Functional/ChannelContextParameters.cs b/wcf/test/Apache/Qpid/Test/Channel/Functional/ChannelContextParameters.cs new file mode 100644 index 0000000000..35e32ce25a --- /dev/null +++ b/wcf/test/Apache/Qpid/Test/Channel/Functional/ChannelContextParameters.cs @@ -0,0 +1,229 @@ +/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+namespace Apache.Qpid.Test.Channel.Functional
+{
+ using System;
+
+ public class ChannelContextParameters
+ {
+ public ChannelContextParameters()
+ {
+ this.NumberOfMessages = 5;
+ this.NumberOfThreads = 1;
+ this.ReceiveTimeout = TimeSpan.FromSeconds(10.0);
+ this.WaitForSender = true;
+ this.UseAcceptChannelTimeout = true;
+ this.CreateChannel = true;
+ this.DoneSendingTimeout = TimeSpan.FromSeconds(10);
+ this.TransactionScopeTimeout = TimeSpan.FromMinutes(1);
+ this.AcceptChannelTimeout = TimeSpan.FromSeconds(10);
+ this.OpenTimeout = TimeSpan.FromSeconds(10);
+ this.ClientCommitDelay = TimeSpan.Zero;
+ this.WaitForChannelTimeout = TimeSpan.FromSeconds(5);
+ this.WaitForMessageTimeout = TimeSpan.FromSeconds(5);
+ }
+
+ public int NumberOfMessages
+ {
+ get;
+ set;
+ }
+
+ public int NumberOfThreads
+ {
+ get;
+ set;
+ }
+
+ public TimeSpan ReceiveTimeout
+ {
+ get;
+ set;
+ }
+
+ public bool SenderShouldAbort
+ {
+ get;
+ set;
+ }
+
+ public bool ReceiverShouldAbort
+ {
+ get;
+ set;
+ }
+
+ public bool AsyncSend
+ {
+ get;
+ set;
+ }
+
+ public bool AsyncReceive
+ {
+ get;
+ set;
+ }
+
+ public bool SendWithoutTransaction
+ {
+ get;
+ set;
+ }
+
+ public bool ReceiveWithoutTransaction
+ {
+ get;
+ set;
+ }
+
+ public bool SendWithMultipleTransactions
+ {
+ get;
+ set;
+ }
+
+ public bool ReceiveWithMultipleTransactions
+ {
+ get;
+ set;
+ }
+
+ public bool CloseBeforeReceivingAll
+ {
+ get;
+ set;
+ }
+
+ public bool WaitForSender
+ {
+ get;
+ set;
+ }
+
+ public TimeSpan DoneSendingTimeout
+ {
+ get;
+ set;
+ }
+
+ public TimeSpan TransactionScopeTimeout
+ {
+ get;
+ set;
+ }
+
+ public TimeSpan AcceptChannelTimeout
+ {
+ get;
+ set;
+ }
+
+ public TimeSpan OpenTimeout
+ {
+ get;
+ set;
+ }
+
+ public bool UseAcceptChannelTimeout
+ {
+ get;
+ set;
+ }
+
+ public bool CreateChannel
+ {
+ get;
+ set;
+ }
+
+ public TimeSpan ClientCommitDelay
+ {
+ get;
+ set;
+ }
+
+ public bool AsyncAccept
+ {
+ get;
+ set;
+ }
+
+ public bool CloseListenerEarly
+ {
+ get;
+ set;
+ }
+
+ public bool AbortTxDatagramAccept
+ {
+ get;
+ set;
+ }
+
+ public bool WaitForChannel
+ {
+ get;
+ set;
+ }
+
+ public TimeSpan WaitForChannelTimeout
+ {
+ get;
+ set;
+ }
+
+ public bool AsyncWaitForChannel
+ {
+ get;
+ set;
+ }
+
+ public bool WaitForMessage
+ {
+ get;
+ set;
+ }
+
+ public TimeSpan WaitForMessageTimeout
+ {
+ get;
+ set;
+ }
+
+ public bool AsyncWaitForMessage
+ {
+ get;
+ set;
+ }
+
+ public bool TryReceive
+ {
+ get;
+ set;
+ }
+
+ public bool TryReceiveNullIAsyncResult
+ {
+ get;
+ set;
+ }
+ }
+}
diff --git a/wcf/test/Apache/Qpid/Test/Channel/Functional/ChannelEntity.cs b/wcf/test/Apache/Qpid/Test/Channel/Functional/ChannelEntity.cs new file mode 100644 index 0000000000..9cabae3201 --- /dev/null +++ b/wcf/test/Apache/Qpid/Test/Channel/Functional/ChannelEntity.cs @@ -0,0 +1,72 @@ +/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+namespace Apache.Qpid.Test.Channel.Functional
+{
+ using System;
+ using System.Collections.Generic;
+ using System.ServiceModel.Channels;
+
+ public abstract class ChannelEntity
+ {
+ public ChannelEntity(ChannelContextParameters contextParameters, Binding channelBinding)
+ {
+ this.Parameters = contextParameters;
+ this.Binding = channelBinding;
+ this.Results = new List<string>();
+ }
+
+ protected ChannelContextParameters Parameters
+ {
+ get;
+ set;
+ }
+
+ protected Binding Binding
+ {
+ get;
+ set;
+ }
+
+ public List<string> Results
+ {
+ get;
+ set;
+ }
+
+ public abstract void Run(string serviceUri);
+
+ protected void WaitForChannel(IChannelListener listener, bool async, TimeSpan timeout)
+ {
+ bool ret = false;
+
+ if (async)
+ {
+ IAsyncResult result = listener.BeginWaitForChannel(timeout, null, null);
+ ret = listener.EndWaitForChannel(result);
+ }
+ else
+ {
+ ret = listener.WaitForChannel(timeout);
+ }
+
+ this.Results.Add(String.Format("WaitForChannel returned {0}", ret));
+ }
+ }
+}
diff --git a/wcf/test/Apache/Qpid/Test/Channel/Functional/ChannelReceiver.cs b/wcf/test/Apache/Qpid/Test/Channel/Functional/ChannelReceiver.cs new file mode 100644 index 0000000000..20af98fa64 --- /dev/null +++ b/wcf/test/Apache/Qpid/Test/Channel/Functional/ChannelReceiver.cs @@ -0,0 +1,280 @@ +/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+namespace Apache.Qpid.Test.Channel.Functional
+{
+ using System;
+ using System.ServiceModel;
+ using System.ServiceModel.Channels;
+ using System.Threading;
+ using System.Transactions;
+
+ public class ChannelReceiver : ChannelEntity
+ {
+ public ChannelReceiver(ChannelContextParameters contextParameters, Binding channelBinding)
+ : base(contextParameters, channelBinding)
+ {
+ }
+
+ public override void Run(string listenUri)
+ {
+ IChannelListener<IInputChannel> listener = this.Binding.BuildChannelListener<IInputChannel>(new Uri(listenUri));
+ listener.Open();
+
+ if (this.Parameters.WaitForChannel)
+ {
+ this.WaitForChannel(listener, this.Parameters.AsyncWaitForChannel, this.Parameters.WaitForChannelTimeout);
+ }
+
+ this.AcceptChannelAndReceive(listener);
+
+ if (listener.State != CommunicationState.Closed)
+ {
+ listener.Close();
+ }
+ }
+
+ private void AcceptChannelAndReceive(IChannelListener<IInputChannel> listener)
+ {
+ IInputChannel channel;
+ TransactionScope transactionToAbortOnAccept = null;
+
+ if (this.Parameters.AbortTxDatagramAccept)
+ {
+ transactionToAbortOnAccept = new TransactionScope(TransactionScopeOption.RequiresNew);
+ }
+
+ if (this.Parameters.AsyncAccept)
+ {
+ IAsyncResult result = listener.BeginAcceptChannel(null, null);
+ channel = listener.EndAcceptChannel(result);
+ }
+ else
+ {
+ channel = listener.AcceptChannel();
+ }
+
+ if (this.Parameters.AbortTxDatagramAccept)
+ {
+ transactionToAbortOnAccept.Dispose();
+ }
+
+ channel.Open();
+ Message message;
+
+ if (this.Parameters.CloseListenerEarly)
+ {
+ listener.Close();
+ }
+
+ try
+ {
+ using (TransactionScope ts = new TransactionScope(TransactionScopeOption.RequiresNew))
+ {
+ Message firstMessage = channel.Receive(this.Parameters.ReceiveTimeout);
+
+ lock (this.Results)
+ {
+ this.Results.Add(String.Format("Received message with Action '{0}'", firstMessage.Headers.Action));
+ }
+
+ ts.Complete();
+ }
+ }
+ catch (TimeoutException)
+ {
+ lock (this.Results)
+ {
+ this.Results.Add("Receive timed out.");
+ }
+
+ channel.Abort();
+ return;
+ }
+
+ AutoResetEvent doneReceiving = new AutoResetEvent(false);
+ int threadsCompleted = 0;
+
+ for (int i = 0; i < this.Parameters.NumberOfThreads; ++i)
+ {
+ ThreadPool.QueueUserWorkItem(new WaitCallback(delegate(object unused)
+ {
+ do
+ {
+ if (this.Parameters.ReceiverShouldAbort)
+ {
+ this.ReceiveMessage(channel, false);
+ Thread.Sleep(200);
+ }
+
+ message = this.ReceiveMessage(channel, true);
+ }
+ while (message != null);
+
+ if (Interlocked.Increment(ref threadsCompleted) == this.Parameters.NumberOfThreads)
+ {
+ doneReceiving.Set();
+ }
+ }));
+ }
+
+ TimeSpan threadTimeout = TimeSpan.FromMinutes(2.0);
+ if (!doneReceiving.WaitOne(threadTimeout, false))
+ {
+ this.Results.Add(String.Format("Threads did not complete within {0}.", threadTimeout));
+ }
+
+ channel.Close();
+ }
+
+ private Message ReceiveMessage(IInputChannel channel, bool commit)
+ {
+ Message message = null;
+
+ using (TransactionScope ts = new TransactionScope(TransactionScopeOption.Required))
+ {
+ bool messageDetected = false;
+ if (this.Parameters.AsyncWaitForMessage)
+ {
+ IAsyncResult result = channel.BeginWaitForMessage(this.Parameters.WaitForMessageTimeout, null, null);
+ messageDetected = channel.EndWaitForMessage(result);
+ }
+ else
+ {
+ messageDetected = channel.WaitForMessage(this.Parameters.WaitForMessageTimeout);
+ }
+
+ if (this.Parameters.WaitForMessage)
+ {
+ lock (this.Results)
+ {
+ this.Results.Add(String.Format("WaitForMessage returned {0}", messageDetected));
+ }
+ }
+
+ if (messageDetected)
+ {
+ if (this.Parameters.AsyncReceive)
+ {
+ if (this.Parameters.TryReceive)
+ {
+ IAsyncResult result = channel.BeginTryReceive(this.Parameters.ReceiveTimeout, null, null);
+ bool ret = channel.EndTryReceive(result, out message);
+
+ lock (this.Results)
+ {
+ this.Results.Add(String.Format("TryReceive returned {0}", ret));
+ }
+ }
+ else
+ {
+ try
+ {
+ IAsyncResult result = channel.BeginReceive(this.Parameters.ReceiveTimeout, null, null);
+ message = channel.EndReceive(result);
+ }
+ catch (TimeoutException)
+ {
+ message = null;
+ }
+ }
+ }
+ else
+ {
+ if (this.Parameters.TryReceive)
+ {
+ bool ret = channel.TryReceive(this.Parameters.ReceiveTimeout, out message);
+
+ lock (this.Results)
+ {
+ this.Results.Add(String.Format("TryReceive returned {0}", ret));
+ }
+ }
+ else
+ {
+ try
+ {
+ message = channel.Receive(this.Parameters.ReceiveTimeout);
+ }
+ catch (TimeoutException)
+ {
+ message = null;
+ }
+ }
+ }
+ }
+ else
+ {
+ if (this.Parameters.TryReceive)
+ {
+ bool ret = false;
+ if (this.Parameters.AsyncReceive)
+ {
+ IAsyncResult result = channel.BeginTryReceive(this.Parameters.ReceiveTimeout, null, null);
+ if (this.Parameters.TryReceiveNullIAsyncResult)
+ {
+ try
+ {
+ channel.EndTryReceive(null, out message);
+ }
+ catch (Exception e)
+ {
+ lock (this.Results)
+ {
+ this.Results.Add(String.Format("TryReceive threw {0}", e.GetType().Name));
+ }
+ }
+ }
+
+ ret = channel.EndTryReceive(result, out message);
+ }
+ else
+ {
+ ret = channel.TryReceive(this.Parameters.ReceiveTimeout, out message);
+ }
+
+ lock (this.Results)
+ {
+ this.Results.Add(String.Format("TryReceive returned {0}", ret));
+ this.Results.Add(String.Format("Message was {0}", (message == null ? "null" : "not null")));
+ }
+ }
+
+ message = null;
+ }
+
+ if (commit && message != null)
+ {
+ lock (this.Results)
+ {
+ this.Results.Add(String.Format("Received message with Action '{0}'", message.Headers.Action));
+ }
+
+ ts.Complete();
+ }
+ else
+ {
+ Transaction.Current.Rollback();
+ }
+ }
+
+ return message;
+ }
+ }
+}
diff --git a/wcf/test/Apache/Qpid/Test/Channel/Functional/ChannelSender.cs b/wcf/test/Apache/Qpid/Test/Channel/Functional/ChannelSender.cs new file mode 100644 index 0000000000..78950dc0d5 --- /dev/null +++ b/wcf/test/Apache/Qpid/Test/Channel/Functional/ChannelSender.cs @@ -0,0 +1,138 @@ +/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+namespace Apache.Qpid.Test.Channel.Functional
+{
+ using System;
+ using System.ServiceModel;
+ using System.ServiceModel.Channels;
+ using System.Threading;
+ using System.Transactions;
+
+ public class ChannelSender : ChannelEntity
+ {
+ public ChannelSender(ChannelContextParameters contextParameters, Binding channelBinding)
+ : base(contextParameters, channelBinding)
+ {
+ }
+
+ public override void Run(string sendTo)
+ {
+ IChannelFactory<IOutputChannel> factory = this.Binding.BuildChannelFactory<IOutputChannel>();
+ factory.Open();
+
+ if (this.Parameters.CreateChannel)
+ {
+ IOutputChannel channel = factory.CreateChannel(new EndpointAddress(sendTo));
+ this.SendMessages(channel);
+ }
+
+ factory.Close();
+ }
+
+ private void SendMessages(IOutputChannel channel)
+ {
+ channel.Open();
+
+ AutoResetEvent doneSending = new AutoResetEvent(false);
+ int threadsCompleted = 0;
+
+ if (this.Parameters.NumberOfMessages > 0)
+ {
+ this.SendMessage(channel, "FirstMessage", true);
+ }
+
+ if (this.Parameters.NumberOfThreads == 1)
+ {
+ for (int j = 0; j < this.Parameters.NumberOfMessages; ++j)
+ {
+ if (this.Parameters.SenderShouldAbort)
+ {
+ this.SendMessage(channel, "Message " + (j + 1), false);
+ }
+
+ this.SendMessage(channel, "Message " + (j + 1), true);
+ }
+
+ doneSending.Set();
+ }
+ else
+ {
+ for (int i = 0; i < this.Parameters.NumberOfThreads; ++i)
+ {
+ ThreadPool.QueueUserWorkItem(new WaitCallback(delegate(object unused)
+ {
+ for (int j = 0; j < this.Parameters.NumberOfMessages / this.Parameters.NumberOfThreads; ++j)
+ {
+ if (this.Parameters.SenderShouldAbort)
+ {
+ this.SendMessage(channel, "Message", false);
+ }
+
+ this.SendMessage(channel, "Message", true);
+ }
+ if (Interlocked.Increment(ref threadsCompleted) == this.Parameters.NumberOfThreads)
+ {
+ doneSending.Set();
+ }
+ }));
+ }
+ }
+
+ TimeSpan threadTimeout = TimeSpan.FromMinutes(2.0);
+ if (!doneSending.WaitOne(threadTimeout, false))
+ {
+ lock (this.Results)
+ {
+ this.Results.Add(String.Format("Threads did not complete within {0}.", threadTimeout));
+ }
+ }
+
+ doneSending.Close();
+ channel.Close();
+ }
+
+ private void SendMessage(IOutputChannel channel, string action, bool commit)
+ {
+ using (TransactionScope ts = new TransactionScope(TransactionScopeOption.RequiresNew))
+ {
+ Message message = Message.CreateMessage(MessageVersion.Default, action);
+
+ if (this.Parameters.AsyncSend)
+ {
+ IAsyncResult result = channel.BeginSend(message, null, null);
+ channel.EndSend(result);
+ }
+ else
+ {
+ channel.Send(message);
+ }
+
+ if (commit)
+ {
+ ts.Complete();
+ }
+ else
+ {
+ Transaction.Current.Rollback();
+ }
+ }
+ }
+ }
+}
diff --git a/wcf/test/Apache/Qpid/Test/Channel/Functional/FunctionalTests.csproj b/wcf/test/Apache/Qpid/Test/Channel/Functional/FunctionalTests.csproj index d01cd99ff5..ab36222d6a 100644 --- a/wcf/test/Apache/Qpid/Test/Channel/Functional/FunctionalTests.csproj +++ b/wcf/test/Apache/Qpid/Test/Channel/Functional/FunctionalTests.csproj @@ -62,6 +62,7 @@ under the License. <Reference Include="System.ServiceModel">
<RequiredTargetFramework>3.0</RequiredTargetFramework>
</Reference>
+ <Reference Include="System.Transactions" />
<Reference Include="System.Xml.Linq">
<RequiredTargetFramework>3.5</RequiredTargetFramework>
</Reference>
@@ -73,18 +74,26 @@ under the License. </ItemGroup>
<ItemGroup>
<Compile Include="AsyncTest.cs" />
+ <Compile Include="ChannelAbortCommitTest.cs" />
+ <Compile Include="ChannelContextParameters.cs" />
+ <Compile Include="ChannelEntity.cs" />
+ <Compile Include="ChannelReceiver.cs" />
+ <Compile Include="ChannelSender.cs" />
<Compile Include="CustomAmqpBindingTest.cs" />
<Compile Include="IGenericObjectService.cs" />
<Compile Include="IInteropService.cs" />
<Compile Include="IQueuedDatagramService1.cs" />
<Compile Include="IQueuedDatagramService2.cs" />
<Compile Include="IQueuedDatagramService3.cs" />
+ <Compile Include="IQueuedServiceUsingTransactionScope.cs" />
+ <Compile Include="IQueuedServiceUsingTSRAttribute.cs" />
<Compile Include="MessageBodyTest.cs" />
<Compile Include="MessagePropertiesTest.cs" />
<Compile Include="MultipleEndpointsSameQueueTest.cs" />
<Compile Include="MessageClient.cs" />
<Compile Include="MessageService.cs" />
<Compile Include="Properties\AssemblyInfo.cs" />
+ <Compile Include="BasicTransactionTest.cs" />
<Compile Include="Util.cs" />
</ItemGroup>
<ItemGroup>
diff --git a/wcf/test/Apache/Qpid/Test/Channel/Functional/IQueuedDatagramService1.cs b/wcf/test/Apache/Qpid/Test/Channel/Functional/IQueuedDatagramService1.cs index bdbbb82f7e..8abbe04874 100644 --- a/wcf/test/Apache/Qpid/Test/Channel/Functional/IQueuedDatagramService1.cs +++ b/wcf/test/Apache/Qpid/Test/Channel/Functional/IQueuedDatagramService1.cs @@ -20,7 +20,6 @@ namespace Apache.Qpid.Test.Channel.Functional { using System.ServiceModel; - using System.ServiceModel.Channels; [ServiceContract(SessionMode = SessionMode.NotAllowed)] public interface IQueuedDatagramService1 diff --git a/wcf/test/Apache/Qpid/Test/Channel/Functional/IQueuedDatagramService2.cs b/wcf/test/Apache/Qpid/Test/Channel/Functional/IQueuedDatagramService2.cs index 565f7aa27b..7d056e9c82 100644 --- a/wcf/test/Apache/Qpid/Test/Channel/Functional/IQueuedDatagramService2.cs +++ b/wcf/test/Apache/Qpid/Test/Channel/Functional/IQueuedDatagramService2.cs @@ -20,7 +20,6 @@ namespace Apache.Qpid.Test.Channel.Functional { using System.ServiceModel; - using System.ServiceModel.Channels; [ServiceContract(SessionMode = SessionMode.NotAllowed)] public interface IQueuedDatagramService2 diff --git a/wcf/test/Apache/Qpid/Test/Channel/Functional/IQueuedServiceUsingTSRAttribute.cs b/wcf/test/Apache/Qpid/Test/Channel/Functional/IQueuedServiceUsingTSRAttribute.cs new file mode 100644 index 0000000000..49c42a25b6 --- /dev/null +++ b/wcf/test/Apache/Qpid/Test/Channel/Functional/IQueuedServiceUsingTSRAttribute.cs @@ -0,0 +1,30 @@ +/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+namespace Apache.Qpid.Test.Channel.Functional
+{
+ using System.ServiceModel;
+
+ [ServiceContract]
+ public interface IQueuedServiceUsingTSRAttribute
+ {
+ [OperationContract(IsOneWay = true)]
+ void Hello(string message);
+ }
+}
diff --git a/wcf/test/Apache/Qpid/Test/Channel/Functional/IQueuedServiceUsingTransactionScope.cs b/wcf/test/Apache/Qpid/Test/Channel/Functional/IQueuedServiceUsingTransactionScope.cs new file mode 100644 index 0000000000..eabceb5720 --- /dev/null +++ b/wcf/test/Apache/Qpid/Test/Channel/Functional/IQueuedServiceUsingTransactionScope.cs @@ -0,0 +1,30 @@ +/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+namespace Apache.Qpid.Test.Channel.Functional
+{
+ using System.ServiceModel;
+
+ [ServiceContract]
+ public interface IQueuedServiceUsingTransactionScope
+ {
+ [OperationContract(IsOneWay = true)]
+ void Hello(string message);
+ }
+}
diff --git a/wcf/test/Apache/Qpid/Test/Channel/Functional/MessageBodyTest.cs b/wcf/test/Apache/Qpid/Test/Channel/Functional/MessageBodyTest.cs index 3fad6b336d..a9555d190d 100644 --- a/wcf/test/Apache/Qpid/Test/Channel/Functional/MessageBodyTest.cs +++ b/wcf/test/Apache/Qpid/Test/Channel/Functional/MessageBodyTest.cs @@ -20,7 +20,6 @@ namespace Apache.Qpid.Test.Channel.Functional { using System; - using System.Collections.Generic; using System.Runtime.Serialization; using System.ServiceModel; using System.ServiceModel.Channels; diff --git a/wcf/test/Apache/Qpid/Test/Channel/Functional/MessageClient.cs b/wcf/test/Apache/Qpid/Test/Channel/Functional/MessageClient.cs index 8f867551b1..b623a0196b 100644 --- a/wcf/test/Apache/Qpid/Test/Channel/Functional/MessageClient.cs +++ b/wcf/test/Apache/Qpid/Test/Channel/Functional/MessageClient.cs @@ -23,6 +23,7 @@ namespace Apache.Qpid.Test.Channel.Functional using System.Reflection; using System.ServiceModel; using System.ServiceModel.Channels; + using System.Transactions; public class MessageClient { @@ -38,19 +39,36 @@ namespace Apache.Qpid.Test.Channel.Functional set; } - public void RunClient<TServiceContract>(EndpointAddress address) + public bool TransactedSend { - Binding amqpBinding = Util.GetBinding(); - Type proxyType = typeof(TServiceContract); - MethodInfo helloMethod = proxyType.GetMethod("Hello"); - MethodInfo goodbyeMethod = proxyType.GetMethod("Goodbye"); + get; + set; + } + public bool CommitTransaction + { + get; + set; + } + + public void RunClient<TServiceContract>(EndpointAddress address) + { string[] messages = new string[this.NumberOfMessages]; for (int i = 0; i < this.NumberOfMessages; ++i) { messages[i] = "Message " + i; } + RunTestClient<TServiceContract>(address, messages); + } + + public void RunTestClient<TServiceContract>(EndpointAddress address, object[] messages) + { + Binding amqpBinding = Util.GetBinding(); + Type proxyType = typeof(TServiceContract); + MethodInfo helloMethod = proxyType.GetMethod("Hello"); + MethodInfo goodbyeMethod = proxyType.GetMethod("Goodbye"); + for (int i = 0; i < this.NumberOfIterations; ++i) { this.CreateChannelAndSendMessages<TServiceContract>(address, amqpBinding, helloMethod, goodbyeMethod, messages); @@ -76,13 +94,38 @@ namespace Apache.Qpid.Test.Channel.Functional ChannelFactory<TServiceContract> channelFactory = new ChannelFactory<TServiceContract>(amqpBinding, address); TServiceContract proxy = channelFactory.CreateChannel(); - foreach (object message in messages) + if (this.TransactedSend) { - helloMethod.Invoke(proxy, new object[] { message }); + using (TransactionScope tx = new TransactionScope(TransactionScopeOption.Required, TimeSpan.FromMinutes(20))) + { + foreach (object message in messages) + { + helloMethod.Invoke(proxy, new object[] { message }); + } + + if (goodbyeMethod != null) + { + goodbyeMethod.Invoke(proxy, new object[0]); + } + + if (this.CommitTransaction) + { + tx.Complete(); + } + } } + else + { + foreach (object message in messages) + { + helloMethod.Invoke(proxy, new object[] { message }); + } - goodbyeMethod.Invoke(proxy, new object[0]); - channelFactory.Close(); + if (goodbyeMethod != null) + { + goodbyeMethod.Invoke(proxy, new object[0]); + } + } } private void CreateInteropChannelAndSendMessages<TServiceContract>(EndpointAddress address, Binding amqpBinding, MethodInfo helloMethod, int messageCount) diff --git a/wcf/test/Apache/Qpid/Test/Channel/Functional/MessageService.cs b/wcf/test/Apache/Qpid/Test/Channel/Functional/MessageService.cs index a473cad986..581464d25e 100644 --- a/wcf/test/Apache/Qpid/Test/Channel/Functional/MessageService.cs +++ b/wcf/test/Apache/Qpid/Test/Channel/Functional/MessageService.cs @@ -24,8 +24,9 @@ namespace Apache.Qpid.Test.Channel.Functional using System.ServiceModel; using System.ServiceModel.Channels; using System.Threading; + using System.Transactions; - public class MessageService : IQueuedDatagramService1, IQueuedDatagramService2, IQueuedDatagramService3, IInteropService + public class MessageService : IQueuedDatagramService1, IQueuedDatagramService2, IQueuedDatagramService3, IInteropService, IQueuedServiceUsingTransactionScope, IQueuedServiceUsingTSRAttribute { private static Dictionary<string, int> methodCallCount = new Dictionary<string, int>(); private static ServiceHost serviceHost; @@ -48,7 +49,7 @@ namespace Apache.Qpid.Test.Channel.Functional set; } - // The test must set these paramters + // The test must set the following paramters public static List<Type> ContractTypes { get; @@ -87,11 +88,27 @@ namespace Apache.Qpid.Test.Channel.Functional serviceHost.Open(); } + public static bool IsServiceRunning() + { + return (serviceHost != null && serviceHost.State == CommunicationState.Opened) ? true : false; + } + public static void StopService() { if (serviceHost.State != CommunicationState.Faulted) { - serviceHost.Close(); + try + { + serviceHost.Close(); + } + catch (Exception e) + { + Console.WriteLine("An exception was thrown while trying to close the service host.\n" + e); + } + } + else + { + Console.WriteLine("Service Faulted."); } } @@ -106,6 +123,7 @@ namespace Apache.Qpid.Test.Channel.Functional ++methodCallCount[method]; ++TotalMethodCallCount; + if (TotalMethodCallCount >= IntendedInvocationCount && CompletionHandle != null) { CompletionHandle.Set(); @@ -114,6 +132,28 @@ namespace Apache.Qpid.Test.Channel.Functional } [OperationBehavior(TransactionScopeRequired = true, TransactionAutoComplete = true)] + void IQueuedServiceUsingTransactionScope.Hello(string message) + { + this.UpdateCounts("IQueuedServiceUsingTransactionScope.Hello"); + + if (message.Trim().ToLower().StartsWith("abort")) + { + throw new Exception(); + } + } + + [OperationBehavior(TransactionScopeRequired = true, TransactionAutoComplete = true)] + void IQueuedServiceUsingTSRAttribute.Hello(string message) + { + this.UpdateCounts("IQueuedServiceUsingTSRAttribute.Hello"); + + if (message.Trim().ToLower().StartsWith("abort")) + { + Transaction.Current.Rollback(); + } + } + + [OperationBehavior(TransactionScopeRequired = true, TransactionAutoComplete = true)] void IQueuedDatagramService1.Hello(string message) { this.UpdateCounts("IQueuedDatagramService1.Hello"); diff --git a/wcf/test/Apache/Qpid/Test/Channel/Functional/Util.cs b/wcf/test/Apache/Qpid/Test/Channel/Functional/Util.cs index 97be1fb925..f08a6fbbfc 100644 --- a/wcf/test/Apache/Qpid/Test/Channel/Functional/Util.cs +++ b/wcf/test/Apache/Qpid/Test/Channel/Functional/Util.cs @@ -19,6 +19,7 @@ namespace Apache.Qpid.Test.Channel.Functional { + using System; using System.Collections.Generic; using System.IO; using System.ServiceModel; @@ -70,5 +71,87 @@ namespace Apache.Qpid.Test.Channel.Functional return brokerBinding; } + + public static int GetMessageCountFromQueue(string listenUri) + { + Message receivedMessage = null; + int messageCount = 0; + + IChannelListener<IInputChannel> listener = Util.GetBinding().BuildChannelListener<IInputChannel>(new Uri(listenUri), new BindingParameterCollection()); + listener.Open(); + IInputChannel proxy = listener.AcceptChannel(TimeSpan.FromSeconds(10)); + proxy.Open(); + + while (true) + { + try + { + receivedMessage = proxy.Receive(TimeSpan.FromSeconds(3)); + } + catch (Exception e) + { + if (e.GetType() == typeof(TimeoutException)) + { + break; + } + else + { + throw; + } + } + + messageCount++; + } + + listener.Close(); + return messageCount; + } + + public static void PurgeQueue(string listenUri) + { + GetMessageCountFromQueue(listenUri); + } + + public static bool CompareResults(List<string> expectedResults, List<string> actualResults) + { + IEnumerator<string> actualResultEnumerator = actualResults.GetEnumerator(); + IEnumerator<string> expectedResultEnumerator = expectedResults.GetEnumerator(); + + bool expectedResultEnumeratorPosition = expectedResultEnumerator.MoveNext(); + bool actualResultEnumeratorPosition = actualResultEnumerator.MoveNext(); + + while (true == actualResultEnumeratorPosition && + true == expectedResultEnumeratorPosition) + { + string expectedResult = expectedResultEnumerator.Current; + string actualResult = actualResultEnumerator.Current; + + if (expectedResult.Equals(actualResult) == false) + { + Console.WriteLine("OrderedResultsComparator: Expected result '{0}', but got '{1}' instead.", expectedResult, actualResult); + return false; + } + + expectedResultEnumeratorPosition = expectedResultEnumerator.MoveNext(); + actualResultEnumeratorPosition = actualResultEnumerator.MoveNext(); + } + + // if either of them has still more data left, its an error + if (true == expectedResultEnumeratorPosition) + { + string expectedResult = expectedResultEnumerator.Current; + Console.WriteLine("OrderedResultsComparator: Got fewer results than expected, first missing result: '{0}'", expectedResult); + return false; + } + + if (true == actualResultEnumeratorPosition) + { + string actualResult = actualResultEnumerator.Current; + Console.WriteLine("OrderedResultsComparator: Got more results than expected, first extra result: '{0}'", actualResult); + return false; + } + + return true; + } } } |