summaryrefslogtreecommitdiff
path: root/dotnet/Qpid.Client.Tests/requestreply1
diff options
context:
space:
mode:
Diffstat (limited to 'dotnet/Qpid.Client.Tests/requestreply1')
-rw-r--r--dotnet/Qpid.Client.Tests/requestreply1/ServiceProvidingClient.cs66
-rw-r--r--dotnet/Qpid.Client.Tests/requestreply1/ServiceRequestingClient.cs64
2 files changed, 43 insertions, 87 deletions
diff --git a/dotnet/Qpid.Client.Tests/requestreply1/ServiceProvidingClient.cs b/dotnet/Qpid.Client.Tests/requestreply1/ServiceProvidingClient.cs
index 515ae41e1c..ad5981a5c5 100644
--- a/dotnet/Qpid.Client.Tests/requestreply1/ServiceProvidingClient.cs
+++ b/dotnet/Qpid.Client.Tests/requestreply1/ServiceProvidingClient.cs
@@ -35,16 +35,15 @@ namespace Qpid.Client.Tests
private string _replyToExchangeName;
private string _replyToRoutingKey;
+ const int PACK = 100;
private IMessagePublisher _destinationPublisher;
+ private IMessageConsumer _consumer;
private string _serviceName = "ServiceQ1";
private string _selector = null;
- //private EventWaitHandle _event = new ManualResetEvent(false);
- private AutoResetEvent _event = new AutoResetEvent(false);
-
[SetUp]
public override void Init()
{
@@ -59,36 +58,38 @@ namespace Qpid.Client.Tests
_channel.DeclareQueue(_serviceName, false, false, false);
- IMessageConsumer consumer = _channel.CreateConsumerBuilder(_serviceName)
+ _consumer = _channel.CreateConsumerBuilder(_serviceName)
.WithPrefetchLow(100)
.WithPrefetchHigh(500)
.WithNoLocal(true)
.Create();
- consumer.OnMessage = new MessageReceivedDelegate(OnMessage);
+ _consumer.OnMessage = new MessageReceivedDelegate(OnMessage);
+ }
+
+ public override void Shutdown()
+ {
+ _consumer.Dispose();
+ base.Shutdown();
}
private void OnConnectionException(Exception e)
{
_logger.Info("Connection exception occurred", e);
- _event.Set(); // Shutdown test on error
// XXX: Test still doesn't shutdown when broker terminates. Is there no heartbeat?
}
[Test]
- public void TestFail()
- {
- Assert.Fail("Tests in this class do not run on autopilot, but hang forever, so commented out until can be fixed.");
- }
-
- /*[Test]
public void Test()
{
_connection.Start();
_logger.Info("Waiting...");
- _event.WaitOne();
- }*/
- public void OnMessage(IMessage message)
+ ServiceRequestingClient client = new ServiceRequestingClient();
+ client.Init();
+ client.SendMessages();
+ }
+
+ private void OnMessage(IMessage message)
{
// _logger.Info("Got message '" + message + "'");
@@ -109,9 +110,9 @@ namespace Qpid.Client.Tests
_destinationPublisher = _channel.CreatePublisherBuilder()
.WithExchangeName(_replyToExchangeName)
.WithRoutingKey(_replyToRoutingKey)
+ .WithDeliveryMode(DeliveryMode.NonPersistent)
.Create();
_destinationPublisher.DisableMessageTimestamp = true;
- _destinationPublisher.DeliveryMode = DeliveryMode.NonPersistent;
_logger.Debug("After create a producer");
}
catch (QpidException e)
@@ -120,7 +121,7 @@ namespace Qpid.Client.Tests
throw e;
}
_messageCount++;
- if (_messageCount % 1000 == 0)
+ if (_messageCount % PACK == 0)
{
_logger.Info("Received message total: " + _messageCount);
_logger.Info(string.Format("Sending response to '{0}:{1}'",
@@ -129,25 +130,20 @@ namespace Qpid.Client.Tests
try
{
- String payload = "This is a response: sing together: 'Mahnah mahnah...'" + tm.Text;
- ITextMessage msg = _channel.CreateTextMessage(payload);
- if (tm.Headers.Contains("timeSent"))
- {
-// _logger.Info("timeSent property set on message");
-// _logger.Info("timeSent value is: " + tm.Headers["timeSent"]);
- msg.Headers["timeSent"] = tm.Headers["timeSent"];
- }
- _destinationPublisher.Send(msg);
- if (_messageCount % 1000 == 0)
- {
- _logger.Info(string.Format("Sending response to '{0}:{1}'",
- _replyToExchangeName, _replyToRoutingKey));
- }
- }
- catch (QpidException e)
+ String payload = "This is a response: sing together: 'Mahnah mahnah...'" + tm.Text;
+ ITextMessage msg = _channel.CreateTextMessage(payload);
+ if ( tm.Headers.Contains("timeSent") )
+ {
+ msg.Headers["timeSent"] = tm.Headers["timeSent"];
+ }
+ _destinationPublisher.Send(msg);
+ } catch ( QpidException e )
{
- _logger.Error("Error sending message: " + e, e);
- throw e;
+ _logger.Error("Error sending message: " + e, e);
+ throw e;
+ } finally
+ {
+ _destinationPublisher.Dispose();
}
}
}
diff --git a/dotnet/Qpid.Client.Tests/requestreply1/ServiceRequestingClient.cs b/dotnet/Qpid.Client.Tests/requestreply1/ServiceRequestingClient.cs
index 4479d767ea..8264879c1f 100644
--- a/dotnet/Qpid.Client.Tests/requestreply1/ServiceRequestingClient.cs
+++ b/dotnet/Qpid.Client.Tests/requestreply1/ServiceRequestingClient.cs
@@ -26,18 +26,18 @@ using Qpid.Messaging;
namespace Qpid.Client.Tests
{
- [TestFixture]
public class ServiceRequestingClient : BaseMessagingTestFixture
{
private const int MESSAGE_SIZE = 1024;
private static string MESSAGE_DATA = new string('x', MESSAGE_SIZE);
- private const int NUM_MESSAGES = 10000;
+ private const int PACK = 100;
+ private const int NUM_MESSAGES = PACK*10; // increase when in standalone
private static ILog _log = LogManager.GetLogger(typeof(ServiceRequestingClient));
- AutoResetEvent _finishedEvent = new AutoResetEvent(false);
-
+ ManualResetEvent _finishedEvent = new ManualResetEvent(false);
+
private int _expectedMessageCount = NUM_MESSAGES;
private long _startTime = 0;
@@ -54,9 +54,9 @@ namespace Qpid.Client.Tests
{
_publisher = _channel.CreatePublisherBuilder()
.WithRoutingKey(_commandQueueName)
+ .WithDeliveryMode(DeliveryMode.NonPersistent)
.Create();
_publisher.DisableMessageTimestamp = true; // XXX: need a "with" for this in builder?
- _publisher.DeliveryMode = DeliveryMode.NonPersistent; // XXX: need a "with" for this in builder?
}
catch (QpidException e)
{
@@ -64,7 +64,7 @@ namespace Qpid.Client.Tests
}
}
- /*[Test]
+ [Test]
public void SendMessages()
{
InitialiseProducer();
@@ -100,47 +100,18 @@ namespace Qpid.Client.Tests
// Added timestamp.
long timeNow = DateTime.Now.Ticks;
string timeSentString = String.Format("{0:G}", timeNow);
-// _log.Info(String.Format("timeSent={0} timeSentString={1}", timeNow, timeSentString));
- msg.Headers.SetString("timeSent", timeSentString);
- //msg.Headers.SetLong("sentAt", timeNow);
+ msg.Headers.SetLong("timeSent", timeNow);
- try
- {
- _publisher.Send(msg);
- }
- catch (Exception e)
- {
- _log.Error("Error sending message: " + e, e);
- //base._port = 5673;
- _log.Info("Reconnecting but on port 5673");
- try
- {
- base.Init();
- InitialiseProducer();
- // cheesy but a quick test
- _log.Info("Calling SendMessages again");
- SendMessages();
- }
- catch (Exception ex)
- {
- _log.Error("Totally busted: failed to reconnect: " + ex, ex);
- }
- }
+ _publisher.Send(msg);
}
// Assert that the test finishes within a reasonable amount of time.
- const int waitSeconds = 10;
+ const int waitSeconds = 40;
const int waitMilliseconds = waitSeconds * 1000;
_log.Info("Finished sending " + _expectedMessageCount + " messages");
_log.Info(String.Format("Waiting {0} seconds to receive last message...", waitSeconds));
Assert.IsTrue(_finishedEvent.WaitOne(waitMilliseconds, false),
String.Format("Expected to finish in {0} seconds", waitSeconds));
- }*/
-
- [Test]
- public void TestFail()
- {
- Assert.Fail("Tests in this class do not run on autopilot, but hang forever, so commented out until can be fixed.");
}
public void OnMessage(IMessage m)
@@ -150,23 +121,19 @@ namespace Qpid.Client.Tests
_log.Debug("Message received: " + m);
}
- //if (m.Headers.Contains("sentAt"))
if (!m.Headers.Contains("timeSent"))
{
throw new Exception("Set timeSent!");
}
- //long sentAt = m.Headers.GetLong("sentAt");
- long sentAt = Int64.Parse(m.Headers.GetString("timeSent"));
+
+ long sentAt = m.Headers.GetLong("timeSent");
long now = DateTime.Now.Ticks;
long latencyTicks = now - sentAt;
-// _log.Info(String.Format("latency = {0} ticks ", latencyTicks));
long latencyMilliseconds = latencyTicks / TimeSpan.TicksPerMillisecond;
-// _log.Info(String.Format("latency = {0} ms", latencyMilliseconds));
averager.Add(latencyMilliseconds);
- // Output average every 1000 messages.
- if (averager.Num % 1000 == 0)
+ if (averager.Num % PACK == 0)
{
_log.Info("Ticks per millisecond = " + TimeSpan.TicksPerMillisecond);
_log.Info(String.Format("Average latency (ms) = {0}", averager));
@@ -185,13 +152,6 @@ namespace Qpid.Client.Tests
_finishedEvent.Set(); // Notify main thread to quit.
}
}
-
- /*public static void Main(String[] args)
- {
- ServiceRequestingClient c = new ServiceRequestingClient();
- c.Init();
- c.SendMessages();
- }*/
}
class Avergager