summaryrefslogtreecommitdiff
path: root/wcf
diff options
context:
space:
mode:
authorStephen D. Huston <shuston@apache.org>2010-04-27 22:10:20 +0000
committerStephen D. Huston <shuston@apache.org>2010-04-27 22:10:20 +0000
commit9cba4c65c2be490a61a324431bacfbb79907ca60 (patch)
tree5c9b5b9943ec169a139ad03e4fc85a52c199a9ee /wcf
parente8235b0d5eaf981a39d71e2df0cf11cf854bd9fe (diff)
downloadqpid-python-9cba4c65c2be490a61a324431bacfbb79907ca60.tar.gz
Applied patch from QPID-2359 adding functional tests for the transactional AMQP WCF channel.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@938695 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'wcf')
-rw-r--r--wcf/test/Apache/Qpid/Test/Channel/Functional/AsyncTest.cs4
-rw-r--r--wcf/test/Apache/Qpid/Test/Channel/Functional/BasicTransactionTest.cs173
-rw-r--r--wcf/test/Apache/Qpid/Test/Channel/Functional/ChannelContextParameters.cs229
-rw-r--r--wcf/test/Apache/Qpid/Test/Channel/Functional/ChannelEntity.cs72
-rw-r--r--wcf/test/Apache/Qpid/Test/Channel/Functional/ChannelReceiver.cs280
-rw-r--r--wcf/test/Apache/Qpid/Test/Channel/Functional/ChannelSender.cs138
-rw-r--r--wcf/test/Apache/Qpid/Test/Channel/Functional/FunctionalTests.csproj9
-rw-r--r--wcf/test/Apache/Qpid/Test/Channel/Functional/IQueuedDatagramService1.cs1
-rw-r--r--wcf/test/Apache/Qpid/Test/Channel/Functional/IQueuedDatagramService2.cs1
-rw-r--r--wcf/test/Apache/Qpid/Test/Channel/Functional/IQueuedServiceUsingTSRAttribute.cs30
-rw-r--r--wcf/test/Apache/Qpid/Test/Channel/Functional/IQueuedServiceUsingTransactionScope.cs30
-rw-r--r--wcf/test/Apache/Qpid/Test/Channel/Functional/MessageBodyTest.cs1
-rw-r--r--wcf/test/Apache/Qpid/Test/Channel/Functional/MessageClient.cs61
-rw-r--r--wcf/test/Apache/Qpid/Test/Channel/Functional/MessageService.cs46
-rw-r--r--wcf/test/Apache/Qpid/Test/Channel/Functional/Util.cs83
15 files changed, 1141 insertions, 17 deletions
diff --git a/wcf/test/Apache/Qpid/Test/Channel/Functional/AsyncTest.cs b/wcf/test/Apache/Qpid/Test/Channel/Functional/AsyncTest.cs
index bf20b5083d..23bed6c603 100644
--- a/wcf/test/Apache/Qpid/Test/Channel/Functional/AsyncTest.cs
+++ b/wcf/test/Apache/Qpid/Test/Channel/Functional/AsyncTest.cs
@@ -175,10 +175,10 @@ namespace Apache.Qpid.Test.Channel.Functional
Thread.Sleep(TimeSpan.FromMilliseconds(300.0)); // Dummy work
inputChannel = listener.EndAcceptChannel(acceptResult);
}
- catch (TimeoutException e)
+ catch (TimeoutException)
{
listener.Close();
- throw e;
+ throw;
}
finally
{
diff --git a/wcf/test/Apache/Qpid/Test/Channel/Functional/BasicTransactionTest.cs b/wcf/test/Apache/Qpid/Test/Channel/Functional/BasicTransactionTest.cs
new file mode 100644
index 0000000000..fa3b79d3a7
--- /dev/null
+++ b/wcf/test/Apache/Qpid/Test/Channel/Functional/BasicTransactionTest.cs
@@ -0,0 +1,173 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+namespace Apache.Qpid.Test.Channel.Functional
+{
+ using System;
+ using System.Collections.Generic;
+ using System.Reflection;
+ using System.ServiceModel;
+ using System.Threading;
+ using NUnit.Framework;
+
+ [TestFixture]
+ public class BasicTransactionTest
+ {
+ private const string SendToUri = "amqp:amq.direct?routingkey=routing_key";
+ private const string ListenUri = "amqp:message_queue";
+
+ private MessageClient client;
+
+ [SetUp]
+ public void Setup()
+ {
+ // Create client
+ this.client = new MessageClient();
+ this.client.NumberOfMessages = 3;
+ this.client.NumberOfIterations = 1;
+
+ // Setup service
+ MessageService.EndpointAddress = ListenUri;
+ MessageService.ContractTypes = new List<Type>();
+ MessageService.CompletionHandle = new EventWaitHandle(false, EventResetMode.AutoReset);
+ }
+
+ [TestCase(true)]
+ [TestCase(false)]
+ public void TransactionalSend(bool commitTransaction)
+ {
+ int expectedMessageCount = 0;
+ this.client.TransactedSend = true;
+
+ MessageService.ContractTypes.Add(typeof(IQueuedServiceUsingTSRAttribute));
+ this.client.CommitTransaction = commitTransaction;
+
+ if (commitTransaction)
+ {
+ expectedMessageCount = this.client.NumberOfIterations * this.client.NumberOfMessages * MessageService.ContractTypes.Count;
+ }
+
+ // Call Service methods.
+ this.SendMessages(String.Empty);
+
+ // Validate results.
+ int actualMessageCount = Util.GetMessageCountFromQueue(ListenUri);
+ Assert.AreEqual(expectedMessageCount, actualMessageCount, "The actual message count wasn't as expected.");
+ }
+
+ [TestCase("UseTransactionScope", true)]
+ [TestCase("UseTransactionScope", false)]
+ [TestCase("UseTSRAttribute", true)]
+ [TestCase("UseTSRAttribute", false)]
+ public void TransactionalReceive(string testVariation, bool commitTransaction)
+ {
+ bool testPassed = true;
+ int expectedMessageCount = 0;
+ this.client.TransactedSend = false;
+ string transactionOutcome = "Commit";
+
+ switch (testVariation.Trim().ToLower())
+ {
+ case "usetransactionscope":
+ {
+ MessageService.ContractTypes.Add(typeof(IQueuedServiceUsingTransactionScope));
+ }
+
+ break;
+ case "usetsrattribute":
+ {
+ MessageService.ContractTypes.Add(typeof(IQueuedServiceUsingTSRAttribute));
+ }
+
+ break;
+ }
+
+ int expectedMethodCallCount = this.client.NumberOfIterations * this.client.NumberOfMessages * MessageService.ContractTypes.Count;
+
+ if (!commitTransaction)
+ {
+ expectedMessageCount = expectedMethodCallCount;
+ transactionOutcome = "Abort";
+ }
+
+ MessageService.IntendedInvocationCount = expectedMethodCallCount;
+
+ // Start the service.
+ MessageService.StartService(Util.GetBinding());
+
+ // Call Service methods.
+ this.SendMessages(transactionOutcome);
+
+ // Allow the wcf service to process all the messages before validation.
+ if (!MessageService.CompletionHandle.WaitOne(TimeSpan.FromSeconds(10.0), false))
+ {
+ Console.WriteLine("The service did not finish processing messages in 10 seconds. Test will be FAILED");
+ testPassed = false;
+ }
+
+ MessageService.StopService();
+
+ // Validate results.
+ if (expectedMethodCallCount > MessageService.TotalMethodCallCount)
+ {
+ Console.WriteLine("The expected method call count was {0} but got {1}.", expectedMethodCallCount, MessageService.TotalMethodCallCount);
+ testPassed = false;
+ }
+
+ int actualMessageCount = Util.GetMessageCountFromQueue(ListenUri);
+ if (expectedMessageCount != actualMessageCount)
+ {
+ Console.WriteLine("The expected message count was {0} but got {1}.", expectedMessageCount, actualMessageCount);
+ testPassed = false;
+ }
+
+ Assert.AreEqual(true, testPassed, "Results not as expected. Testcase FAILED.");
+
+ }
+
+ [TearDown]
+ public void Cleanup()
+ {
+ if (MessageService.IsServiceRunning())
+ {
+ MessageService.StopService();
+ }
+ }
+
+ private void SendMessages(string messageString)
+ {
+ // Create messages to send.
+ string[] messages = new string[this.client.NumberOfMessages];
+ for (int i = 0; i < this.client.NumberOfMessages; ++i)
+ {
+ messages[i] = messageString + " Message " + i;
+ }
+
+ // Create Amqpchannel and send messages.
+ MethodInfo runClientMethod = this.client.GetType().GetMethod("RunTestClient");
+ EndpointAddress address = new EndpointAddress(SendToUri);
+
+ foreach (Type contractType in MessageService.ContractTypes)
+ {
+ MethodInfo runClientT = runClientMethod.MakeGenericMethod(contractType);
+ runClientT.Invoke(this.client, new object[] { address, messages });
+ }
+ }
+ }
+}
diff --git a/wcf/test/Apache/Qpid/Test/Channel/Functional/ChannelContextParameters.cs b/wcf/test/Apache/Qpid/Test/Channel/Functional/ChannelContextParameters.cs
new file mode 100644
index 0000000000..35e32ce25a
--- /dev/null
+++ b/wcf/test/Apache/Qpid/Test/Channel/Functional/ChannelContextParameters.cs
@@ -0,0 +1,229 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+namespace Apache.Qpid.Test.Channel.Functional
+{
+ using System;
+
+ public class ChannelContextParameters
+ {
+ public ChannelContextParameters()
+ {
+ this.NumberOfMessages = 5;
+ this.NumberOfThreads = 1;
+ this.ReceiveTimeout = TimeSpan.FromSeconds(10.0);
+ this.WaitForSender = true;
+ this.UseAcceptChannelTimeout = true;
+ this.CreateChannel = true;
+ this.DoneSendingTimeout = TimeSpan.FromSeconds(10);
+ this.TransactionScopeTimeout = TimeSpan.FromMinutes(1);
+ this.AcceptChannelTimeout = TimeSpan.FromSeconds(10);
+ this.OpenTimeout = TimeSpan.FromSeconds(10);
+ this.ClientCommitDelay = TimeSpan.Zero;
+ this.WaitForChannelTimeout = TimeSpan.FromSeconds(5);
+ this.WaitForMessageTimeout = TimeSpan.FromSeconds(5);
+ }
+
+ public int NumberOfMessages
+ {
+ get;
+ set;
+ }
+
+ public int NumberOfThreads
+ {
+ get;
+ set;
+ }
+
+ public TimeSpan ReceiveTimeout
+ {
+ get;
+ set;
+ }
+
+ public bool SenderShouldAbort
+ {
+ get;
+ set;
+ }
+
+ public bool ReceiverShouldAbort
+ {
+ get;
+ set;
+ }
+
+ public bool AsyncSend
+ {
+ get;
+ set;
+ }
+
+ public bool AsyncReceive
+ {
+ get;
+ set;
+ }
+
+ public bool SendWithoutTransaction
+ {
+ get;
+ set;
+ }
+
+ public bool ReceiveWithoutTransaction
+ {
+ get;
+ set;
+ }
+
+ public bool SendWithMultipleTransactions
+ {
+ get;
+ set;
+ }
+
+ public bool ReceiveWithMultipleTransactions
+ {
+ get;
+ set;
+ }
+
+ public bool CloseBeforeReceivingAll
+ {
+ get;
+ set;
+ }
+
+ public bool WaitForSender
+ {
+ get;
+ set;
+ }
+
+ public TimeSpan DoneSendingTimeout
+ {
+ get;
+ set;
+ }
+
+ public TimeSpan TransactionScopeTimeout
+ {
+ get;
+ set;
+ }
+
+ public TimeSpan AcceptChannelTimeout
+ {
+ get;
+ set;
+ }
+
+ public TimeSpan OpenTimeout
+ {
+ get;
+ set;
+ }
+
+ public bool UseAcceptChannelTimeout
+ {
+ get;
+ set;
+ }
+
+ public bool CreateChannel
+ {
+ get;
+ set;
+ }
+
+ public TimeSpan ClientCommitDelay
+ {
+ get;
+ set;
+ }
+
+ public bool AsyncAccept
+ {
+ get;
+ set;
+ }
+
+ public bool CloseListenerEarly
+ {
+ get;
+ set;
+ }
+
+ public bool AbortTxDatagramAccept
+ {
+ get;
+ set;
+ }
+
+ public bool WaitForChannel
+ {
+ get;
+ set;
+ }
+
+ public TimeSpan WaitForChannelTimeout
+ {
+ get;
+ set;
+ }
+
+ public bool AsyncWaitForChannel
+ {
+ get;
+ set;
+ }
+
+ public bool WaitForMessage
+ {
+ get;
+ set;
+ }
+
+ public TimeSpan WaitForMessageTimeout
+ {
+ get;
+ set;
+ }
+
+ public bool AsyncWaitForMessage
+ {
+ get;
+ set;
+ }
+
+ public bool TryReceive
+ {
+ get;
+ set;
+ }
+
+ public bool TryReceiveNullIAsyncResult
+ {
+ get;
+ set;
+ }
+ }
+}
diff --git a/wcf/test/Apache/Qpid/Test/Channel/Functional/ChannelEntity.cs b/wcf/test/Apache/Qpid/Test/Channel/Functional/ChannelEntity.cs
new file mode 100644
index 0000000000..9cabae3201
--- /dev/null
+++ b/wcf/test/Apache/Qpid/Test/Channel/Functional/ChannelEntity.cs
@@ -0,0 +1,72 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+namespace Apache.Qpid.Test.Channel.Functional
+{
+ using System;
+ using System.Collections.Generic;
+ using System.ServiceModel.Channels;
+
+ public abstract class ChannelEntity
+ {
+ public ChannelEntity(ChannelContextParameters contextParameters, Binding channelBinding)
+ {
+ this.Parameters = contextParameters;
+ this.Binding = channelBinding;
+ this.Results = new List<string>();
+ }
+
+ protected ChannelContextParameters Parameters
+ {
+ get;
+ set;
+ }
+
+ protected Binding Binding
+ {
+ get;
+ set;
+ }
+
+ public List<string> Results
+ {
+ get;
+ set;
+ }
+
+ public abstract void Run(string serviceUri);
+
+ protected void WaitForChannel(IChannelListener listener, bool async, TimeSpan timeout)
+ {
+ bool ret = false;
+
+ if (async)
+ {
+ IAsyncResult result = listener.BeginWaitForChannel(timeout, null, null);
+ ret = listener.EndWaitForChannel(result);
+ }
+ else
+ {
+ ret = listener.WaitForChannel(timeout);
+ }
+
+ this.Results.Add(String.Format("WaitForChannel returned {0}", ret));
+ }
+ }
+}
diff --git a/wcf/test/Apache/Qpid/Test/Channel/Functional/ChannelReceiver.cs b/wcf/test/Apache/Qpid/Test/Channel/Functional/ChannelReceiver.cs
new file mode 100644
index 0000000000..20af98fa64
--- /dev/null
+++ b/wcf/test/Apache/Qpid/Test/Channel/Functional/ChannelReceiver.cs
@@ -0,0 +1,280 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+namespace Apache.Qpid.Test.Channel.Functional
+{
+ using System;
+ using System.ServiceModel;
+ using System.ServiceModel.Channels;
+ using System.Threading;
+ using System.Transactions;
+
+ public class ChannelReceiver : ChannelEntity
+ {
+ public ChannelReceiver(ChannelContextParameters contextParameters, Binding channelBinding)
+ : base(contextParameters, channelBinding)
+ {
+ }
+
+ public override void Run(string listenUri)
+ {
+ IChannelListener<IInputChannel> listener = this.Binding.BuildChannelListener<IInputChannel>(new Uri(listenUri));
+ listener.Open();
+
+ if (this.Parameters.WaitForChannel)
+ {
+ this.WaitForChannel(listener, this.Parameters.AsyncWaitForChannel, this.Parameters.WaitForChannelTimeout);
+ }
+
+ this.AcceptChannelAndReceive(listener);
+
+ if (listener.State != CommunicationState.Closed)
+ {
+ listener.Close();
+ }
+ }
+
+ private void AcceptChannelAndReceive(IChannelListener<IInputChannel> listener)
+ {
+ IInputChannel channel;
+ TransactionScope transactionToAbortOnAccept = null;
+
+ if (this.Parameters.AbortTxDatagramAccept)
+ {
+ transactionToAbortOnAccept = new TransactionScope(TransactionScopeOption.RequiresNew);
+ }
+
+ if (this.Parameters.AsyncAccept)
+ {
+ IAsyncResult result = listener.BeginAcceptChannel(null, null);
+ channel = listener.EndAcceptChannel(result);
+ }
+ else
+ {
+ channel = listener.AcceptChannel();
+ }
+
+ if (this.Parameters.AbortTxDatagramAccept)
+ {
+ transactionToAbortOnAccept.Dispose();
+ }
+
+ channel.Open();
+ Message message;
+
+ if (this.Parameters.CloseListenerEarly)
+ {
+ listener.Close();
+ }
+
+ try
+ {
+ using (TransactionScope ts = new TransactionScope(TransactionScopeOption.RequiresNew))
+ {
+ Message firstMessage = channel.Receive(this.Parameters.ReceiveTimeout);
+
+ lock (this.Results)
+ {
+ this.Results.Add(String.Format("Received message with Action '{0}'", firstMessage.Headers.Action));
+ }
+
+ ts.Complete();
+ }
+ }
+ catch (TimeoutException)
+ {
+ lock (this.Results)
+ {
+ this.Results.Add("Receive timed out.");
+ }
+
+ channel.Abort();
+ return;
+ }
+
+ AutoResetEvent doneReceiving = new AutoResetEvent(false);
+ int threadsCompleted = 0;
+
+ for (int i = 0; i < this.Parameters.NumberOfThreads; ++i)
+ {
+ ThreadPool.QueueUserWorkItem(new WaitCallback(delegate(object unused)
+ {
+ do
+ {
+ if (this.Parameters.ReceiverShouldAbort)
+ {
+ this.ReceiveMessage(channel, false);
+ Thread.Sleep(200);
+ }
+
+ message = this.ReceiveMessage(channel, true);
+ }
+ while (message != null);
+
+ if (Interlocked.Increment(ref threadsCompleted) == this.Parameters.NumberOfThreads)
+ {
+ doneReceiving.Set();
+ }
+ }));
+ }
+
+ TimeSpan threadTimeout = TimeSpan.FromMinutes(2.0);
+ if (!doneReceiving.WaitOne(threadTimeout, false))
+ {
+ this.Results.Add(String.Format("Threads did not complete within {0}.", threadTimeout));
+ }
+
+ channel.Close();
+ }
+
+ private Message ReceiveMessage(IInputChannel channel, bool commit)
+ {
+ Message message = null;
+
+ using (TransactionScope ts = new TransactionScope(TransactionScopeOption.Required))
+ {
+ bool messageDetected = false;
+ if (this.Parameters.AsyncWaitForMessage)
+ {
+ IAsyncResult result = channel.BeginWaitForMessage(this.Parameters.WaitForMessageTimeout, null, null);
+ messageDetected = channel.EndWaitForMessage(result);
+ }
+ else
+ {
+ messageDetected = channel.WaitForMessage(this.Parameters.WaitForMessageTimeout);
+ }
+
+ if (this.Parameters.WaitForMessage)
+ {
+ lock (this.Results)
+ {
+ this.Results.Add(String.Format("WaitForMessage returned {0}", messageDetected));
+ }
+ }
+
+ if (messageDetected)
+ {
+ if (this.Parameters.AsyncReceive)
+ {
+ if (this.Parameters.TryReceive)
+ {
+ IAsyncResult result = channel.BeginTryReceive(this.Parameters.ReceiveTimeout, null, null);
+ bool ret = channel.EndTryReceive(result, out message);
+
+ lock (this.Results)
+ {
+ this.Results.Add(String.Format("TryReceive returned {0}", ret));
+ }
+ }
+ else
+ {
+ try
+ {
+ IAsyncResult result = channel.BeginReceive(this.Parameters.ReceiveTimeout, null, null);
+ message = channel.EndReceive(result);
+ }
+ catch (TimeoutException)
+ {
+ message = null;
+ }
+ }
+ }
+ else
+ {
+ if (this.Parameters.TryReceive)
+ {
+ bool ret = channel.TryReceive(this.Parameters.ReceiveTimeout, out message);
+
+ lock (this.Results)
+ {
+ this.Results.Add(String.Format("TryReceive returned {0}", ret));
+ }
+ }
+ else
+ {
+ try
+ {
+ message = channel.Receive(this.Parameters.ReceiveTimeout);
+ }
+ catch (TimeoutException)
+ {
+ message = null;
+ }
+ }
+ }
+ }
+ else
+ {
+ if (this.Parameters.TryReceive)
+ {
+ bool ret = false;
+ if (this.Parameters.AsyncReceive)
+ {
+ IAsyncResult result = channel.BeginTryReceive(this.Parameters.ReceiveTimeout, null, null);
+ if (this.Parameters.TryReceiveNullIAsyncResult)
+ {
+ try
+ {
+ channel.EndTryReceive(null, out message);
+ }
+ catch (Exception e)
+ {
+ lock (this.Results)
+ {
+ this.Results.Add(String.Format("TryReceive threw {0}", e.GetType().Name));
+ }
+ }
+ }
+
+ ret = channel.EndTryReceive(result, out message);
+ }
+ else
+ {
+ ret = channel.TryReceive(this.Parameters.ReceiveTimeout, out message);
+ }
+
+ lock (this.Results)
+ {
+ this.Results.Add(String.Format("TryReceive returned {0}", ret));
+ this.Results.Add(String.Format("Message was {0}", (message == null ? "null" : "not null")));
+ }
+ }
+
+ message = null;
+ }
+
+ if (commit && message != null)
+ {
+ lock (this.Results)
+ {
+ this.Results.Add(String.Format("Received message with Action '{0}'", message.Headers.Action));
+ }
+
+ ts.Complete();
+ }
+ else
+ {
+ Transaction.Current.Rollback();
+ }
+ }
+
+ return message;
+ }
+ }
+}
diff --git a/wcf/test/Apache/Qpid/Test/Channel/Functional/ChannelSender.cs b/wcf/test/Apache/Qpid/Test/Channel/Functional/ChannelSender.cs
new file mode 100644
index 0000000000..78950dc0d5
--- /dev/null
+++ b/wcf/test/Apache/Qpid/Test/Channel/Functional/ChannelSender.cs
@@ -0,0 +1,138 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+namespace Apache.Qpid.Test.Channel.Functional
+{
+ using System;
+ using System.ServiceModel;
+ using System.ServiceModel.Channels;
+ using System.Threading;
+ using System.Transactions;
+
+ public class ChannelSender : ChannelEntity
+ {
+ public ChannelSender(ChannelContextParameters contextParameters, Binding channelBinding)
+ : base(contextParameters, channelBinding)
+ {
+ }
+
+ public override void Run(string sendTo)
+ {
+ IChannelFactory<IOutputChannel> factory = this.Binding.BuildChannelFactory<IOutputChannel>();
+ factory.Open();
+
+ if (this.Parameters.CreateChannel)
+ {
+ IOutputChannel channel = factory.CreateChannel(new EndpointAddress(sendTo));
+ this.SendMessages(channel);
+ }
+
+ factory.Close();
+ }
+
+ private void SendMessages(IOutputChannel channel)
+ {
+ channel.Open();
+
+ AutoResetEvent doneSending = new AutoResetEvent(false);
+ int threadsCompleted = 0;
+
+ if (this.Parameters.NumberOfMessages > 0)
+ {
+ this.SendMessage(channel, "FirstMessage", true);
+ }
+
+ if (this.Parameters.NumberOfThreads == 1)
+ {
+ for (int j = 0; j < this.Parameters.NumberOfMessages; ++j)
+ {
+ if (this.Parameters.SenderShouldAbort)
+ {
+ this.SendMessage(channel, "Message " + (j + 1), false);
+ }
+
+ this.SendMessage(channel, "Message " + (j + 1), true);
+ }
+
+ doneSending.Set();
+ }
+ else
+ {
+ for (int i = 0; i < this.Parameters.NumberOfThreads; ++i)
+ {
+ ThreadPool.QueueUserWorkItem(new WaitCallback(delegate(object unused)
+ {
+ for (int j = 0; j < this.Parameters.NumberOfMessages / this.Parameters.NumberOfThreads; ++j)
+ {
+ if (this.Parameters.SenderShouldAbort)
+ {
+ this.SendMessage(channel, "Message", false);
+ }
+
+ this.SendMessage(channel, "Message", true);
+ }
+ if (Interlocked.Increment(ref threadsCompleted) == this.Parameters.NumberOfThreads)
+ {
+ doneSending.Set();
+ }
+ }));
+ }
+ }
+
+ TimeSpan threadTimeout = TimeSpan.FromMinutes(2.0);
+ if (!doneSending.WaitOne(threadTimeout, false))
+ {
+ lock (this.Results)
+ {
+ this.Results.Add(String.Format("Threads did not complete within {0}.", threadTimeout));
+ }
+ }
+
+ doneSending.Close();
+ channel.Close();
+ }
+
+ private void SendMessage(IOutputChannel channel, string action, bool commit)
+ {
+ using (TransactionScope ts = new TransactionScope(TransactionScopeOption.RequiresNew))
+ {
+ Message message = Message.CreateMessage(MessageVersion.Default, action);
+
+ if (this.Parameters.AsyncSend)
+ {
+ IAsyncResult result = channel.BeginSend(message, null, null);
+ channel.EndSend(result);
+ }
+ else
+ {
+ channel.Send(message);
+ }
+
+ if (commit)
+ {
+ ts.Complete();
+ }
+ else
+ {
+ Transaction.Current.Rollback();
+ }
+ }
+ }
+ }
+}
diff --git a/wcf/test/Apache/Qpid/Test/Channel/Functional/FunctionalTests.csproj b/wcf/test/Apache/Qpid/Test/Channel/Functional/FunctionalTests.csproj
index d01cd99ff5..ab36222d6a 100644
--- a/wcf/test/Apache/Qpid/Test/Channel/Functional/FunctionalTests.csproj
+++ b/wcf/test/Apache/Qpid/Test/Channel/Functional/FunctionalTests.csproj
@@ -62,6 +62,7 @@ under the License.
<Reference Include="System.ServiceModel">
<RequiredTargetFramework>3.0</RequiredTargetFramework>
</Reference>
+ <Reference Include="System.Transactions" />
<Reference Include="System.Xml.Linq">
<RequiredTargetFramework>3.5</RequiredTargetFramework>
</Reference>
@@ -73,18 +74,26 @@ under the License.
</ItemGroup>
<ItemGroup>
<Compile Include="AsyncTest.cs" />
+ <Compile Include="ChannelAbortCommitTest.cs" />
+ <Compile Include="ChannelContextParameters.cs" />
+ <Compile Include="ChannelEntity.cs" />
+ <Compile Include="ChannelReceiver.cs" />
+ <Compile Include="ChannelSender.cs" />
<Compile Include="CustomAmqpBindingTest.cs" />
<Compile Include="IGenericObjectService.cs" />
<Compile Include="IInteropService.cs" />
<Compile Include="IQueuedDatagramService1.cs" />
<Compile Include="IQueuedDatagramService2.cs" />
<Compile Include="IQueuedDatagramService3.cs" />
+ <Compile Include="IQueuedServiceUsingTransactionScope.cs" />
+ <Compile Include="IQueuedServiceUsingTSRAttribute.cs" />
<Compile Include="MessageBodyTest.cs" />
<Compile Include="MessagePropertiesTest.cs" />
<Compile Include="MultipleEndpointsSameQueueTest.cs" />
<Compile Include="MessageClient.cs" />
<Compile Include="MessageService.cs" />
<Compile Include="Properties\AssemblyInfo.cs" />
+ <Compile Include="BasicTransactionTest.cs" />
<Compile Include="Util.cs" />
</ItemGroup>
<ItemGroup>
diff --git a/wcf/test/Apache/Qpid/Test/Channel/Functional/IQueuedDatagramService1.cs b/wcf/test/Apache/Qpid/Test/Channel/Functional/IQueuedDatagramService1.cs
index bdbbb82f7e..8abbe04874 100644
--- a/wcf/test/Apache/Qpid/Test/Channel/Functional/IQueuedDatagramService1.cs
+++ b/wcf/test/Apache/Qpid/Test/Channel/Functional/IQueuedDatagramService1.cs
@@ -20,7 +20,6 @@
namespace Apache.Qpid.Test.Channel.Functional
{
using System.ServiceModel;
- using System.ServiceModel.Channels;
[ServiceContract(SessionMode = SessionMode.NotAllowed)]
public interface IQueuedDatagramService1
diff --git a/wcf/test/Apache/Qpid/Test/Channel/Functional/IQueuedDatagramService2.cs b/wcf/test/Apache/Qpid/Test/Channel/Functional/IQueuedDatagramService2.cs
index 565f7aa27b..7d056e9c82 100644
--- a/wcf/test/Apache/Qpid/Test/Channel/Functional/IQueuedDatagramService2.cs
+++ b/wcf/test/Apache/Qpid/Test/Channel/Functional/IQueuedDatagramService2.cs
@@ -20,7 +20,6 @@
namespace Apache.Qpid.Test.Channel.Functional
{
using System.ServiceModel;
- using System.ServiceModel.Channels;
[ServiceContract(SessionMode = SessionMode.NotAllowed)]
public interface IQueuedDatagramService2
diff --git a/wcf/test/Apache/Qpid/Test/Channel/Functional/IQueuedServiceUsingTSRAttribute.cs b/wcf/test/Apache/Qpid/Test/Channel/Functional/IQueuedServiceUsingTSRAttribute.cs
new file mode 100644
index 0000000000..49c42a25b6
--- /dev/null
+++ b/wcf/test/Apache/Qpid/Test/Channel/Functional/IQueuedServiceUsingTSRAttribute.cs
@@ -0,0 +1,30 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+namespace Apache.Qpid.Test.Channel.Functional
+{
+ using System.ServiceModel;
+
+ [ServiceContract]
+ public interface IQueuedServiceUsingTSRAttribute
+ {
+ [OperationContract(IsOneWay = true)]
+ void Hello(string message);
+ }
+}
diff --git a/wcf/test/Apache/Qpid/Test/Channel/Functional/IQueuedServiceUsingTransactionScope.cs b/wcf/test/Apache/Qpid/Test/Channel/Functional/IQueuedServiceUsingTransactionScope.cs
new file mode 100644
index 0000000000..eabceb5720
--- /dev/null
+++ b/wcf/test/Apache/Qpid/Test/Channel/Functional/IQueuedServiceUsingTransactionScope.cs
@@ -0,0 +1,30 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+namespace Apache.Qpid.Test.Channel.Functional
+{
+ using System.ServiceModel;
+
+ [ServiceContract]
+ public interface IQueuedServiceUsingTransactionScope
+ {
+ [OperationContract(IsOneWay = true)]
+ void Hello(string message);
+ }
+}
diff --git a/wcf/test/Apache/Qpid/Test/Channel/Functional/MessageBodyTest.cs b/wcf/test/Apache/Qpid/Test/Channel/Functional/MessageBodyTest.cs
index 3fad6b336d..a9555d190d 100644
--- a/wcf/test/Apache/Qpid/Test/Channel/Functional/MessageBodyTest.cs
+++ b/wcf/test/Apache/Qpid/Test/Channel/Functional/MessageBodyTest.cs
@@ -20,7 +20,6 @@
namespace Apache.Qpid.Test.Channel.Functional
{
using System;
- using System.Collections.Generic;
using System.Runtime.Serialization;
using System.ServiceModel;
using System.ServiceModel.Channels;
diff --git a/wcf/test/Apache/Qpid/Test/Channel/Functional/MessageClient.cs b/wcf/test/Apache/Qpid/Test/Channel/Functional/MessageClient.cs
index 8f867551b1..b623a0196b 100644
--- a/wcf/test/Apache/Qpid/Test/Channel/Functional/MessageClient.cs
+++ b/wcf/test/Apache/Qpid/Test/Channel/Functional/MessageClient.cs
@@ -23,6 +23,7 @@ namespace Apache.Qpid.Test.Channel.Functional
using System.Reflection;
using System.ServiceModel;
using System.ServiceModel.Channels;
+ using System.Transactions;
public class MessageClient
{
@@ -38,19 +39,36 @@ namespace Apache.Qpid.Test.Channel.Functional
set;
}
- public void RunClient<TServiceContract>(EndpointAddress address)
+ public bool TransactedSend
{
- Binding amqpBinding = Util.GetBinding();
- Type proxyType = typeof(TServiceContract);
- MethodInfo helloMethod = proxyType.GetMethod("Hello");
- MethodInfo goodbyeMethod = proxyType.GetMethod("Goodbye");
+ get;
+ set;
+ }
+ public bool CommitTransaction
+ {
+ get;
+ set;
+ }
+
+ public void RunClient<TServiceContract>(EndpointAddress address)
+ {
string[] messages = new string[this.NumberOfMessages];
for (int i = 0; i < this.NumberOfMessages; ++i)
{
messages[i] = "Message " + i;
}
+ RunTestClient<TServiceContract>(address, messages);
+ }
+
+ public void RunTestClient<TServiceContract>(EndpointAddress address, object[] messages)
+ {
+ Binding amqpBinding = Util.GetBinding();
+ Type proxyType = typeof(TServiceContract);
+ MethodInfo helloMethod = proxyType.GetMethod("Hello");
+ MethodInfo goodbyeMethod = proxyType.GetMethod("Goodbye");
+
for (int i = 0; i < this.NumberOfIterations; ++i)
{
this.CreateChannelAndSendMessages<TServiceContract>(address, amqpBinding, helloMethod, goodbyeMethod, messages);
@@ -76,13 +94,38 @@ namespace Apache.Qpid.Test.Channel.Functional
ChannelFactory<TServiceContract> channelFactory = new ChannelFactory<TServiceContract>(amqpBinding, address);
TServiceContract proxy = channelFactory.CreateChannel();
- foreach (object message in messages)
+ if (this.TransactedSend)
{
- helloMethod.Invoke(proxy, new object[] { message });
+ using (TransactionScope tx = new TransactionScope(TransactionScopeOption.Required, TimeSpan.FromMinutes(20)))
+ {
+ foreach (object message in messages)
+ {
+ helloMethod.Invoke(proxy, new object[] { message });
+ }
+
+ if (goodbyeMethod != null)
+ {
+ goodbyeMethod.Invoke(proxy, new object[0]);
+ }
+
+ if (this.CommitTransaction)
+ {
+ tx.Complete();
+ }
+ }
}
+ else
+ {
+ foreach (object message in messages)
+ {
+ helloMethod.Invoke(proxy, new object[] { message });
+ }
- goodbyeMethod.Invoke(proxy, new object[0]);
- channelFactory.Close();
+ if (goodbyeMethod != null)
+ {
+ goodbyeMethod.Invoke(proxy, new object[0]);
+ }
+ }
}
private void CreateInteropChannelAndSendMessages<TServiceContract>(EndpointAddress address, Binding amqpBinding, MethodInfo helloMethod, int messageCount)
diff --git a/wcf/test/Apache/Qpid/Test/Channel/Functional/MessageService.cs b/wcf/test/Apache/Qpid/Test/Channel/Functional/MessageService.cs
index a473cad986..581464d25e 100644
--- a/wcf/test/Apache/Qpid/Test/Channel/Functional/MessageService.cs
+++ b/wcf/test/Apache/Qpid/Test/Channel/Functional/MessageService.cs
@@ -24,8 +24,9 @@ namespace Apache.Qpid.Test.Channel.Functional
using System.ServiceModel;
using System.ServiceModel.Channels;
using System.Threading;
+ using System.Transactions;
- public class MessageService : IQueuedDatagramService1, IQueuedDatagramService2, IQueuedDatagramService3, IInteropService
+ public class MessageService : IQueuedDatagramService1, IQueuedDatagramService2, IQueuedDatagramService3, IInteropService, IQueuedServiceUsingTransactionScope, IQueuedServiceUsingTSRAttribute
{
private static Dictionary<string, int> methodCallCount = new Dictionary<string, int>();
private static ServiceHost serviceHost;
@@ -48,7 +49,7 @@ namespace Apache.Qpid.Test.Channel.Functional
set;
}
- // The test must set these paramters
+ // The test must set the following paramters
public static List<Type> ContractTypes
{
get;
@@ -87,11 +88,27 @@ namespace Apache.Qpid.Test.Channel.Functional
serviceHost.Open();
}
+ public static bool IsServiceRunning()
+ {
+ return (serviceHost != null && serviceHost.State == CommunicationState.Opened) ? true : false;
+ }
+
public static void StopService()
{
if (serviceHost.State != CommunicationState.Faulted)
{
- serviceHost.Close();
+ try
+ {
+ serviceHost.Close();
+ }
+ catch (Exception e)
+ {
+ Console.WriteLine("An exception was thrown while trying to close the service host.\n" + e);
+ }
+ }
+ else
+ {
+ Console.WriteLine("Service Faulted.");
}
}
@@ -106,6 +123,7 @@ namespace Apache.Qpid.Test.Channel.Functional
++methodCallCount[method];
++TotalMethodCallCount;
+
if (TotalMethodCallCount >= IntendedInvocationCount && CompletionHandle != null)
{
CompletionHandle.Set();
@@ -114,6 +132,28 @@ namespace Apache.Qpid.Test.Channel.Functional
}
[OperationBehavior(TransactionScopeRequired = true, TransactionAutoComplete = true)]
+ void IQueuedServiceUsingTransactionScope.Hello(string message)
+ {
+ this.UpdateCounts("IQueuedServiceUsingTransactionScope.Hello");
+
+ if (message.Trim().ToLower().StartsWith("abort"))
+ {
+ throw new Exception();
+ }
+ }
+
+ [OperationBehavior(TransactionScopeRequired = true, TransactionAutoComplete = true)]
+ void IQueuedServiceUsingTSRAttribute.Hello(string message)
+ {
+ this.UpdateCounts("IQueuedServiceUsingTSRAttribute.Hello");
+
+ if (message.Trim().ToLower().StartsWith("abort"))
+ {
+ Transaction.Current.Rollback();
+ }
+ }
+
+ [OperationBehavior(TransactionScopeRequired = true, TransactionAutoComplete = true)]
void IQueuedDatagramService1.Hello(string message)
{
this.UpdateCounts("IQueuedDatagramService1.Hello");
diff --git a/wcf/test/Apache/Qpid/Test/Channel/Functional/Util.cs b/wcf/test/Apache/Qpid/Test/Channel/Functional/Util.cs
index 97be1fb925..f08a6fbbfc 100644
--- a/wcf/test/Apache/Qpid/Test/Channel/Functional/Util.cs
+++ b/wcf/test/Apache/Qpid/Test/Channel/Functional/Util.cs
@@ -19,6 +19,7 @@
namespace Apache.Qpid.Test.Channel.Functional
{
+ using System;
using System.Collections.Generic;
using System.IO;
using System.ServiceModel;
@@ -70,5 +71,87 @@ namespace Apache.Qpid.Test.Channel.Functional
return brokerBinding;
}
+
+ public static int GetMessageCountFromQueue(string listenUri)
+ {
+ Message receivedMessage = null;
+ int messageCount = 0;
+
+ IChannelListener<IInputChannel> listener = Util.GetBinding().BuildChannelListener<IInputChannel>(new Uri(listenUri), new BindingParameterCollection());
+ listener.Open();
+ IInputChannel proxy = listener.AcceptChannel(TimeSpan.FromSeconds(10));
+ proxy.Open();
+
+ while (true)
+ {
+ try
+ {
+ receivedMessage = proxy.Receive(TimeSpan.FromSeconds(3));
+ }
+ catch (Exception e)
+ {
+ if (e.GetType() == typeof(TimeoutException))
+ {
+ break;
+ }
+ else
+ {
+ throw;
+ }
+ }
+
+ messageCount++;
+ }
+
+ listener.Close();
+ return messageCount;
+ }
+
+ public static void PurgeQueue(string listenUri)
+ {
+ GetMessageCountFromQueue(listenUri);
+ }
+
+ public static bool CompareResults(List<string> expectedResults, List<string> actualResults)
+ {
+ IEnumerator<string> actualResultEnumerator = actualResults.GetEnumerator();
+ IEnumerator<string> expectedResultEnumerator = expectedResults.GetEnumerator();
+
+ bool expectedResultEnumeratorPosition = expectedResultEnumerator.MoveNext();
+ bool actualResultEnumeratorPosition = actualResultEnumerator.MoveNext();
+
+ while (true == actualResultEnumeratorPosition &&
+ true == expectedResultEnumeratorPosition)
+ {
+ string expectedResult = expectedResultEnumerator.Current;
+ string actualResult = actualResultEnumerator.Current;
+
+ if (expectedResult.Equals(actualResult) == false)
+ {
+ Console.WriteLine("OrderedResultsComparator: Expected result '{0}', but got '{1}' instead.", expectedResult, actualResult);
+ return false;
+ }
+
+ expectedResultEnumeratorPosition = expectedResultEnumerator.MoveNext();
+ actualResultEnumeratorPosition = actualResultEnumerator.MoveNext();
+ }
+
+ // if either of them has still more data left, its an error
+ if (true == expectedResultEnumeratorPosition)
+ {
+ string expectedResult = expectedResultEnumerator.Current;
+ Console.WriteLine("OrderedResultsComparator: Got fewer results than expected, first missing result: '{0}'", expectedResult);
+ return false;
+ }
+
+ if (true == actualResultEnumeratorPosition)
+ {
+ string actualResult = actualResultEnumerator.Current;
+ Console.WriteLine("OrderedResultsComparator: Got more results than expected, first extra result: '{0}'", actualResult);
+ return false;
+ }
+
+ return true;
+ }
}
}