diff options
Diffstat (limited to 'dotnet/Qpid.Client.Tests/requestreply1')
| -rw-r--r-- | dotnet/Qpid.Client.Tests/requestreply1/ServiceProvidingClient.cs | 66 | ||||
| -rw-r--r-- | dotnet/Qpid.Client.Tests/requestreply1/ServiceRequestingClient.cs | 64 |
2 files changed, 43 insertions, 87 deletions
diff --git a/dotnet/Qpid.Client.Tests/requestreply1/ServiceProvidingClient.cs b/dotnet/Qpid.Client.Tests/requestreply1/ServiceProvidingClient.cs index 515ae41e1c..ad5981a5c5 100644 --- a/dotnet/Qpid.Client.Tests/requestreply1/ServiceProvidingClient.cs +++ b/dotnet/Qpid.Client.Tests/requestreply1/ServiceProvidingClient.cs @@ -35,16 +35,15 @@ namespace Qpid.Client.Tests private string _replyToExchangeName; private string _replyToRoutingKey; + const int PACK = 100; private IMessagePublisher _destinationPublisher; + private IMessageConsumer _consumer; private string _serviceName = "ServiceQ1"; private string _selector = null; - //private EventWaitHandle _event = new ManualResetEvent(false); - private AutoResetEvent _event = new AutoResetEvent(false); - [SetUp] public override void Init() { @@ -59,36 +58,38 @@ namespace Qpid.Client.Tests _channel.DeclareQueue(_serviceName, false, false, false); - IMessageConsumer consumer = _channel.CreateConsumerBuilder(_serviceName) + _consumer = _channel.CreateConsumerBuilder(_serviceName) .WithPrefetchLow(100) .WithPrefetchHigh(500) .WithNoLocal(true) .Create(); - consumer.OnMessage = new MessageReceivedDelegate(OnMessage); + _consumer.OnMessage = new MessageReceivedDelegate(OnMessage); + } + + public override void Shutdown() + { + _consumer.Dispose(); + base.Shutdown(); } private void OnConnectionException(Exception e) { _logger.Info("Connection exception occurred", e); - _event.Set(); // Shutdown test on error // XXX: Test still doesn't shutdown when broker terminates. Is there no heartbeat? } [Test] - public void TestFail() - { - Assert.Fail("Tests in this class do not run on autopilot, but hang forever, so commented out until can be fixed."); - } - - /*[Test] public void Test() { _connection.Start(); _logger.Info("Waiting..."); - _event.WaitOne(); - }*/ - public void OnMessage(IMessage message) + ServiceRequestingClient client = new ServiceRequestingClient(); + client.Init(); + client.SendMessages(); + } + + private void OnMessage(IMessage message) { // _logger.Info("Got message '" + message + "'"); @@ -109,9 +110,9 @@ namespace Qpid.Client.Tests _destinationPublisher = _channel.CreatePublisherBuilder() .WithExchangeName(_replyToExchangeName) .WithRoutingKey(_replyToRoutingKey) + .WithDeliveryMode(DeliveryMode.NonPersistent) .Create(); _destinationPublisher.DisableMessageTimestamp = true; - _destinationPublisher.DeliveryMode = DeliveryMode.NonPersistent; _logger.Debug("After create a producer"); } catch (QpidException e) @@ -120,7 +121,7 @@ namespace Qpid.Client.Tests throw e; } _messageCount++; - if (_messageCount % 1000 == 0) + if (_messageCount % PACK == 0) { _logger.Info("Received message total: " + _messageCount); _logger.Info(string.Format("Sending response to '{0}:{1}'", @@ -129,25 +130,20 @@ namespace Qpid.Client.Tests try { - String payload = "This is a response: sing together: 'Mahnah mahnah...'" + tm.Text; - ITextMessage msg = _channel.CreateTextMessage(payload); - if (tm.Headers.Contains("timeSent")) - { -// _logger.Info("timeSent property set on message"); -// _logger.Info("timeSent value is: " + tm.Headers["timeSent"]); - msg.Headers["timeSent"] = tm.Headers["timeSent"]; - } - _destinationPublisher.Send(msg); - if (_messageCount % 1000 == 0) - { - _logger.Info(string.Format("Sending response to '{0}:{1}'", - _replyToExchangeName, _replyToRoutingKey)); - } - } - catch (QpidException e) + String payload = "This is a response: sing together: 'Mahnah mahnah...'" + tm.Text; + ITextMessage msg = _channel.CreateTextMessage(payload); + if ( tm.Headers.Contains("timeSent") ) + { + msg.Headers["timeSent"] = tm.Headers["timeSent"]; + } + _destinationPublisher.Send(msg); + } catch ( QpidException e ) { - _logger.Error("Error sending message: " + e, e); - throw e; + _logger.Error("Error sending message: " + e, e); + throw e; + } finally + { + _destinationPublisher.Dispose(); } } } diff --git a/dotnet/Qpid.Client.Tests/requestreply1/ServiceRequestingClient.cs b/dotnet/Qpid.Client.Tests/requestreply1/ServiceRequestingClient.cs index 4479d767ea..8264879c1f 100644 --- a/dotnet/Qpid.Client.Tests/requestreply1/ServiceRequestingClient.cs +++ b/dotnet/Qpid.Client.Tests/requestreply1/ServiceRequestingClient.cs @@ -26,18 +26,18 @@ using Qpid.Messaging; namespace Qpid.Client.Tests { - [TestFixture] public class ServiceRequestingClient : BaseMessagingTestFixture { private const int MESSAGE_SIZE = 1024; private static string MESSAGE_DATA = new string('x', MESSAGE_SIZE); - private const int NUM_MESSAGES = 10000; + private const int PACK = 100; + private const int NUM_MESSAGES = PACK*10; // increase when in standalone private static ILog _log = LogManager.GetLogger(typeof(ServiceRequestingClient)); - AutoResetEvent _finishedEvent = new AutoResetEvent(false); - + ManualResetEvent _finishedEvent = new ManualResetEvent(false); + private int _expectedMessageCount = NUM_MESSAGES; private long _startTime = 0; @@ -54,9 +54,9 @@ namespace Qpid.Client.Tests { _publisher = _channel.CreatePublisherBuilder() .WithRoutingKey(_commandQueueName) + .WithDeliveryMode(DeliveryMode.NonPersistent) .Create(); _publisher.DisableMessageTimestamp = true; // XXX: need a "with" for this in builder? - _publisher.DeliveryMode = DeliveryMode.NonPersistent; // XXX: need a "with" for this in builder? } catch (QpidException e) { @@ -64,7 +64,7 @@ namespace Qpid.Client.Tests } } - /*[Test] + [Test] public void SendMessages() { InitialiseProducer(); @@ -100,47 +100,18 @@ namespace Qpid.Client.Tests // Added timestamp. long timeNow = DateTime.Now.Ticks; string timeSentString = String.Format("{0:G}", timeNow); -// _log.Info(String.Format("timeSent={0} timeSentString={1}", timeNow, timeSentString)); - msg.Headers.SetString("timeSent", timeSentString); - //msg.Headers.SetLong("sentAt", timeNow); + msg.Headers.SetLong("timeSent", timeNow); - try - { - _publisher.Send(msg); - } - catch (Exception e) - { - _log.Error("Error sending message: " + e, e); - //base._port = 5673; - _log.Info("Reconnecting but on port 5673"); - try - { - base.Init(); - InitialiseProducer(); - // cheesy but a quick test - _log.Info("Calling SendMessages again"); - SendMessages(); - } - catch (Exception ex) - { - _log.Error("Totally busted: failed to reconnect: " + ex, ex); - } - } + _publisher.Send(msg); } // Assert that the test finishes within a reasonable amount of time. - const int waitSeconds = 10; + const int waitSeconds = 40; const int waitMilliseconds = waitSeconds * 1000; _log.Info("Finished sending " + _expectedMessageCount + " messages"); _log.Info(String.Format("Waiting {0} seconds to receive last message...", waitSeconds)); Assert.IsTrue(_finishedEvent.WaitOne(waitMilliseconds, false), String.Format("Expected to finish in {0} seconds", waitSeconds)); - }*/ - - [Test] - public void TestFail() - { - Assert.Fail("Tests in this class do not run on autopilot, but hang forever, so commented out until can be fixed."); } public void OnMessage(IMessage m) @@ -150,23 +121,19 @@ namespace Qpid.Client.Tests _log.Debug("Message received: " + m); } - //if (m.Headers.Contains("sentAt")) if (!m.Headers.Contains("timeSent")) { throw new Exception("Set timeSent!"); } - //long sentAt = m.Headers.GetLong("sentAt"); - long sentAt = Int64.Parse(m.Headers.GetString("timeSent")); + + long sentAt = m.Headers.GetLong("timeSent"); long now = DateTime.Now.Ticks; long latencyTicks = now - sentAt; -// _log.Info(String.Format("latency = {0} ticks ", latencyTicks)); long latencyMilliseconds = latencyTicks / TimeSpan.TicksPerMillisecond; -// _log.Info(String.Format("latency = {0} ms", latencyMilliseconds)); averager.Add(latencyMilliseconds); - // Output average every 1000 messages. - if (averager.Num % 1000 == 0) + if (averager.Num % PACK == 0) { _log.Info("Ticks per millisecond = " + TimeSpan.TicksPerMillisecond); _log.Info(String.Format("Average latency (ms) = {0}", averager)); @@ -185,13 +152,6 @@ namespace Qpid.Client.Tests _finishedEvent.Set(); // Notify main thread to quit. } } - - /*public static void Main(String[] args) - { - ServiceRequestingClient c = new ServiceRequestingClient(); - c.Init(); - c.SendMessages(); - }*/ } class Avergager |
