diff options
| author | Tomas Restrepo <tomasr@apache.org> | 2007-05-10 23:02:46 +0000 |
|---|---|---|
| committer | Tomas Restrepo <tomasr@apache.org> | 2007-05-10 23:02:46 +0000 |
| commit | c10d31cbbbed7b2997816cb9d296c679073b8aa5 (patch) | |
| tree | deb4f80768418007e4fa871d231f4d5620905d9b | |
| parent | 29abd268f7005cf0a952d9c77cf1bbca28d49f28 (diff) | |
| download | qpid-python-c10d31cbbbed7b2997816cb9d296c679073b8aa5.tar.gz | |
Merged revisions 537015-537026 via svnmerge from
https://svn.apache.org/repos/asf/incubator/qpid/branches/M2
........
r537015 | tomasr | 2007-05-10 17:16:49 -0500 (Thu, 10 May 2007) | 1 line
QPID-435: Fix HeadersExchangeTest
........
r537019 | tomasr | 2007-05-10 17:25:01 -0500 (Thu, 10 May 2007) | 1 line
QPID-441 Fix handling of bounced messages
........
r537026 | tomasr | 2007-05-10 17:46:46 -0500 (Thu, 10 May 2007) | 1 line
QPID-398 SSL support for .NET client
........
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@537031 13f79535-47bb-0310-9956-ffa450edef68
45 files changed, 1528 insertions, 303 deletions
diff --git a/dotnet/NOTICE.txt b/dotnet/NOTICE.txt index ec97abcb86..910974b01c 100644 --- a/dotnet/NOTICE.txt +++ b/dotnet/NOTICE.txt @@ -20,3 +20,7 @@ This product also includes software developed by: Alexei A. Vorontsov or Copyright © 2000-2002 Philip A. Craig. Available under terms based on the zlib/libpng licence. Available from http://www.nunit.org/ + + - The Mentalis Security Library, Copyright © 2002-2006, , The Mentalis.org Team + under tterms based on the BSD license (http://www.mentalis.org/site/license.qpx). + Available from http://www.mentalis.org/soft/projects/seclib/
\ No newline at end of file diff --git a/dotnet/Qpid.Client.Tests/HeadersExchange/HeadersExchangeTest.cs b/dotnet/Qpid.Client.Tests/HeadersExchange/HeadersExchangeTest.cs index 2700c4afb2..2ab8c00388 100644 --- a/dotnet/Qpid.Client.Tests/HeadersExchange/HeadersExchangeTest.cs +++ b/dotnet/Qpid.Client.Tests/HeadersExchange/HeadersExchangeTest.cs @@ -96,7 +96,7 @@ namespace Qpid.Client.Tests _consumer = _channel.CreateConsumerBuilder(queueName)
.WithPrefetchLow(100)
.WithPrefetchHigh(500)
- .WithNoLocal(true)
+ .WithNoLocal(false) // make sure we get our own messages
.Create();
// Register this to listen for messages on the consumer.
@@ -188,7 +188,7 @@ namespace Qpid.Client.Tests SendTestMessage(msg, true);
}
- /// <summary>Check that a message matching only some fields of a headers exhcnage is not passed by the exchange.</summary>
+ /// <summary>Check that a message matching only some fields of a headers exchange is not passed by the exchange.</summary>
[Test]
public void TestMatchOneFails()
{
@@ -258,9 +258,9 @@ namespace Qpid.Client.Tests {
FieldTable matchTable = new FieldTable();
- // Currently all String matching must be prefixed by an "S" ("S" for string because of a failing of the FieldType definition).
- matchTable["Smatch1"] = "foo";
- matchTable["Smatch2"] = "";
+ matchTable["match1"] = "foo";
+ matchTable["match2"] = "";
+ matchTable["x-match"] = "all";
return matchTable;
}
diff --git a/dotnet/Qpid.Client.Tests/Qpid.Client.Tests.csproj b/dotnet/Qpid.Client.Tests/Qpid.Client.Tests.csproj index 3d5af00887..21c6ff9ea2 100644 --- a/dotnet/Qpid.Client.Tests/Qpid.Client.Tests.csproj +++ b/dotnet/Qpid.Client.Tests/Qpid.Client.Tests.csproj @@ -46,6 +46,7 @@ <ItemGroup>
<Compile Include="bio\BlockingIo.cs" />
<Compile Include="connection\ConnectionTest.cs" />
+ <Compile Include="connection\SslConnectionTest.cs" />
<Compile Include="failover\FailoverTest.cs" />
<Compile Include="failover\FailoverTxTest.cs" />
<Compile Include="HeadersExchange\HeadersExchangeTest.cs" />
@@ -86,6 +87,7 @@ </ItemGroup>
<ItemGroup>
<None Include="App.config" />
+ <EmbeddedResource Include="connection\QpidTestCert.pfx" />
<None Include="Qpid.Common.DLL.config">
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
</None>
diff --git a/dotnet/Qpid.Client.Tests/Security/CallbackHandlerRegistryTests.cs b/dotnet/Qpid.Client.Tests/Security/CallbackHandlerRegistryTests.cs index ec0594263f..e18fa15c20 100644 --- a/dotnet/Qpid.Client.Tests/Security/CallbackHandlerRegistryTests.cs +++ b/dotnet/Qpid.Client.Tests/Security/CallbackHandlerRegistryTests.cs @@ -32,7 +32,7 @@ namespace Qpid.Client.Tests.Security public void ParsesConfiguration()
{
CallbackHandlerRegistry registry = CallbackHandlerRegistry.Instance;
- Assert.AreEqual(3, registry.Mechanisms.Length);
+ Assert.AreEqual(4, registry.Mechanisms.Length);
Assert.Contains("TEST", registry.Mechanisms);
Type handlerType = registry.GetCallbackHandler("TEST");
diff --git a/dotnet/Qpid.Client.Tests/connection/QpidTestCert.pfx b/dotnet/Qpid.Client.Tests/connection/QpidTestCert.pfx Binary files differnew file mode 100644 index 0000000000..3f1855c6e9 --- /dev/null +++ b/dotnet/Qpid.Client.Tests/connection/QpidTestCert.pfx diff --git a/dotnet/Qpid.Client.Tests/connection/SslConnectionTest.cs b/dotnet/Qpid.Client.Tests/connection/SslConnectionTest.cs new file mode 100644 index 0000000000..b8941c8459 --- /dev/null +++ b/dotnet/Qpid.Client.Tests/connection/SslConnectionTest.cs @@ -0,0 +1,91 @@ +/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.IO;
+using System.Reflection;
+using System.Security.Cryptography.X509Certificates;
+using NUnit.Framework;
+using Qpid.Client.Qms;
+using Qpid.Messaging;
+
+namespace Qpid.Client.Tests.Connection
+{
+ /// <summary>
+ /// Test SSL/TLS connections to the broker
+ /// </summary>
+ [TestFixture]
+ public class SslConnectionTest
+ {
+ /// <summary>
+ /// Make a test TLS connection to the broker
+ /// without using client-certificates
+ /// </summary>
+ [Test]
+ public void DoSslConnection()
+ {
+ // because for tests we don't usually trust the server certificate
+ // we need here to tell the client to ignore certificate validation errors
+ SslOptions sslConfig = new SslOptions(null, true);
+
+ MakeBrokerConnection(sslConfig);
+ }
+
+ /// <summary>
+ /// Make a TLS connection to the broker with a
+ /// client-side certificate
+ /// </summary>
+ [Test]
+ public void DoSslConnectionWithClientCert()
+ {
+ // because for tests we don't usually trust the server certificate
+ // we need here to tell the client to ignore certificate validation errors
+ SslOptions sslConfig = new SslOptions(LoadClientCert(), true);
+
+ MakeBrokerConnection(sslConfig);
+ }
+
+ private static void MakeBrokerConnection(SslOptions options)
+ {
+ IConnectionInfo connectionInfo = new QpidConnectionInfo();
+ connectionInfo.VirtualHost = "test";
+ connectionInfo.AddBrokerInfo(new AmqBrokerInfo("amqp", "localhost", 8672, options));
+
+ using ( IConnection connection = new AMQConnection(connectionInfo) )
+ {
+ Console.WriteLine("connection = " + connection);
+ }
+ }
+
+ private static X509Certificate LoadClientCert()
+ {
+ // load a self-issued certificate from an embedded
+ // resource
+ const string name = "Qpid.Client.Tests.connection.QpidTestCert.pfx";
+ Assembly assembly = Assembly.GetExecutingAssembly();
+
+ Stream res = assembly.GetManifestResourceStream(name);
+ byte[] buffer = new byte[res.Length];
+ res.Read(buffer, 0, buffer.Length);
+
+ return new X509Certificate(buffer);
+ }
+ }
+}
diff --git a/dotnet/Qpid.Client.Tests/requestreply1/ServiceRequestingClient.cs b/dotnet/Qpid.Client.Tests/requestreply1/ServiceRequestingClient.cs index 68eee90b64..4479d767ea 100644 --- a/dotnet/Qpid.Client.Tests/requestreply1/ServiceRequestingClient.cs +++ b/dotnet/Qpid.Client.Tests/requestreply1/ServiceRequestingClient.cs @@ -40,7 +40,7 @@ namespace Qpid.Client.Tests private int _expectedMessageCount = NUM_MESSAGES; - private long _startTime; + private long _startTime = 0; private string _commandQueueName = "ServiceQ1"; diff --git a/dotnet/Qpid.Client.Tests/undeliverable/UndeliverableTest.cs b/dotnet/Qpid.Client.Tests/undeliverable/UndeliverableTest.cs index 84ae2c92c1..bae5b6d8f9 100644 --- a/dotnet/Qpid.Client.Tests/undeliverable/UndeliverableTest.cs +++ b/dotnet/Qpid.Client.Tests/undeliverable/UndeliverableTest.cs @@ -26,69 +26,103 @@ using Qpid.Messaging; namespace Qpid.Client.Tests { - [TestFixture] - public class UndeliverableTest : BaseMessagingTestFixture - { - private static ILog _logger = LogManager.GetLogger(typeof(UndeliverableTest)); + /// <summary> + /// Tests that when sending undeliverable messages with the + /// mandatory flag set, an exception is raised on the connection + /// as the message is bounced back by the broker + /// </summary> + [TestFixture] + public class UndeliverableTest : BaseMessagingTestFixture + { + private static ILog _logger = LogManager.GetLogger(typeof(UndeliverableTest)); + private ManualResetEvent _event; + public const int TIMEOUT = 1000; + private Exception _lastException; - [SetUp] - public override void Init() - { - base.Init(); + [SetUp] + public override void Init() + { + base.Init(); + _event = new ManualResetEvent(false); + _lastException = null; - try - { - _connection.ExceptionListener = new ExceptionListenerDelegate(OnException); - } - catch (QpidException e) + try + { + _connection.ExceptionListener = new ExceptionListenerDelegate(OnException); + } catch ( QpidException e ) + { + _logger.Error("Could not add ExceptionListener", e); + } + } + + public void OnException(Exception e) + { + // Here we dig out the AMQUndelivered exception (if present) in order to log the returned message. + + _lastException = e; + _logger.Error("OnException handler received connection-level exception", e); + if ( e is QpidException ) + { + QpidException qe = (QpidException)e; + if ( qe.InnerException is AMQUndeliveredException ) { - _logger.Error("Could not add ExceptionListener", e); + AMQUndeliveredException ue = (AMQUndeliveredException)qe.InnerException; + _logger.Error("inner exception is AMQUndeliveredException", ue); + _logger.Error(string.Format("Returned message = {0}", ue.GetUndeliveredMessage())); } - } + } + _event.Set(); + } - public static void OnException(Exception e) - { - // Here we dig out the AMQUndelivered exception (if present) in order to log the returned message. + [Test] + public void SendUndeliverableMessageOnDefaultExchange() + { + SendOne("default exchange", null); + } + [Test] + public void SendUndeliverableMessageOnDirectExchange() + { + SendOne("direct exchange", ExchangeNameDefaults.DIRECT); + } + [Test] + public void SendUndeliverableMessageOnTopicExchange() + { + SendOne("topic exchange", ExchangeNameDefaults.TOPIC); + } + [Test] + public void SendUndeliverableMessageOnHeadersExchange() + { + SendOne("headers exchange", ExchangeNameDefaults.HEADERS); + } - _logger.Error("OnException handler received connection-level exception", e); - if (e is QpidException) - { - QpidException qe = (QpidException)e; - if (qe.InnerException is AMQUndeliveredException) - { - AMQUndeliveredException ue = (AMQUndeliveredException)qe.InnerException; - _logger.Error("inner exception is AMQUndeliveredException", ue); - _logger.Error(string.Format("Returned message = {0}", ue.GetUndeliveredMessage())); + private void SendOne(string exchangeNameFriendly, string exchangeName) + { + _logger.Info("Sending undeliverable message to " + exchangeNameFriendly); - } - } - } + // Send a test message to a non-existant queue + // on the specified exchange. See if message is returned! + MessagePublisherBuilder builder = _channel.CreatePublisherBuilder() + .WithRoutingKey("Non-existant route key!") + .WithMandatory(true); // necessary so that the server bounces the message back + if ( exchangeName != null ) + { + builder.WithExchangeName(exchangeName); + } + IMessagePublisher publisher = builder.Create(); + publisher.Send(_channel.CreateTextMessage("Hiya!")); - [Test] - public void SendUndeliverableMessage() - { - SendOne("default exchange", null); - SendOne("direct exchange", ExchangeNameDefaults.DIRECT); - SendOne("topic exchange", ExchangeNameDefaults.TOPIC); - SendOne("headers exchange", ExchangeNameDefaults.HEADERS); + // check we received an exception on the connection + // and that it is of the right type + _event.WaitOne(TIMEOUT, true); - Thread.Sleep(1000); // Wait for message returns! - } + Type expectedException = typeof(AMQUndeliveredException); + Exception ex = _lastException; + Assert.IsNotNull(ex, "No exception was thrown by the test. Expected " + expectedException); - private void SendOne(string exchangeNameFriendly, string exchangeName) - { - _logger.Info("Sending undeliverable message to " + exchangeNameFriendly); + if ( ex.InnerException != null ) + ex = ex.InnerException; - // Send a test message to a non-existant queue on the default exchange. See if message is returned! - MessagePublisherBuilder builder = _channel.CreatePublisherBuilder() - .WithRoutingKey("Non-existant route key!") - .WithMandatory(true); - if (exchangeName != null) - { - builder.WithExchangeName(exchangeName); - } - IMessagePublisher publisher = builder.Create(); - publisher.Send(_channel.CreateTextMessage("Hiya!")); - } - } + Assert.IsInstanceOfType(expectedException, ex); + } + } } diff --git a/dotnet/Qpid.Client/Client/AMQConnection.cs b/dotnet/Qpid.Client/Client/AMQConnection.cs index a59670ef5a..da8498f938 100644 --- a/dotnet/Qpid.Client/Client/AMQConnection.cs +++ b/dotnet/Qpid.Client/Client/AMQConnection.cs @@ -672,9 +672,9 @@ namespace Qpid.Client } } - public bool AttemptReconnection(String host, int port, bool useSSL) + public bool AttemptReconnection(String host, int port, SslOptions sslConfig) { - IBrokerInfo bd = new AmqBrokerInfo("amqp", host, port, useSSL); + IBrokerInfo bd = new AmqBrokerInfo("amqp", host, port, sslConfig); _failoverPolicy.setBroker(bd); @@ -708,10 +708,10 @@ namespace Qpid.Client _transport = LoadTransportFromAssembly(brokerDetail.getHost(), brokerDetail.getPort(), assemblyName, transportType); */ - _transport = new BlockingSocketTransport(brokerDetail.Host, brokerDetail.Port, this); + _transport = new BlockingSocketTransport(); // Connect. - _transport.Open(); + _transport.Connect(brokerDetail, this); _protocolWriter = new ProtocolWriter(_transport.ProtocolWriter, _protocolListener); _protocolSession = new AMQProtocolSession(_transport.ProtocolWriter, _transport, this); _protocolListener.ProtocolSession = _protocolSession; diff --git a/dotnet/Qpid.Client/Client/AMQNoConsumersException.cs b/dotnet/Qpid.Client/Client/AMQNoConsumersException.cs new file mode 100644 index 0000000000..ec5944bdac --- /dev/null +++ b/dotnet/Qpid.Client/Client/AMQNoConsumersException.cs @@ -0,0 +1,45 @@ +/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.Runtime.Serialization;
+using Qpid.Common;
+using Qpid.Protocol;
+
+namespace Qpid.Client
+{
+ [Serializable]
+ public class AMQNoConsumersException : AMQUndeliveredException
+ {
+ public AMQNoConsumersException(string message)
+ : this(message, null)
+ {
+ }
+
+ public AMQNoConsumersException(string message, object bounced)
+ : base(AMQConstant.NO_CONSUMERS.Code, message, bounced)
+ {
+ }
+ protected AMQNoConsumersException(SerializationInfo info, StreamingContext ctxt)
+ : base(info, ctxt)
+ {
+ }
+ }
+}
diff --git a/dotnet/Qpid.Client/Client/AMQNoRouteException.cs b/dotnet/Qpid.Client/Client/AMQNoRouteException.cs new file mode 100644 index 0000000000..8f0db1c3d5 --- /dev/null +++ b/dotnet/Qpid.Client/Client/AMQNoRouteException.cs @@ -0,0 +1,46 @@ +/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.Runtime.Serialization;
+using Qpid.Common;
+using Qpid.Protocol;
+
+namespace Qpid.Client
+{
+ [Serializable]
+ public class AMQNoRouteException : AMQUndeliveredException
+ {
+ public AMQNoRouteException(string message)
+ : this(message, null)
+ {
+ }
+
+ public AMQNoRouteException(string message, object bounced)
+ : base(AMQConstant.NO_ROUTE.Code, message, bounced)
+ {
+ }
+
+ protected AMQNoRouteException(SerializationInfo info, StreamingContext ctxt)
+ : base(info, ctxt)
+ {
+ }
+ }
+}
diff --git a/dotnet/Qpid.Client/Client/AmqBrokerInfo.cs b/dotnet/Qpid.Client/Client/AmqBrokerInfo.cs index f26756ccad..9ae1a49473 100644 --- a/dotnet/Qpid.Client/Client/AmqBrokerInfo.cs +++ b/dotnet/Qpid.Client/Client/AmqBrokerInfo.cs @@ -36,6 +36,7 @@ namespace Qpid.Client private int _port = 5672; private string _transport = "amqp"; private Hashtable _options = new Hashtable(); + private SslOptions _sslOptions; public AmqBrokerInfo() { @@ -182,6 +183,21 @@ namespace Qpid.Client } } + public AmqBrokerInfo(string transport, string host, int port, SslOptions sslConfig) + : this() + { + _transport = transport; + _host = host; + _port = port; + + if ( sslConfig != null ) + { + SetOption(BrokerInfoConstants.OPTIONS_SSL, "true"); + _sslOptions = sslConfig; + } + } + + public string Host { get { return _host; } @@ -200,6 +216,11 @@ namespace Qpid.Client set { _transport = value; } } + public SslOptions SslOptions + { + get { return _sslOptions; } + } + public string GetOption(string key) { return (string)_options[key]; diff --git a/dotnet/Qpid.Client/Client/AmqChannel.cs b/dotnet/Qpid.Client/Client/AmqChannel.cs index 07650c170b..3471ac3640 100644 --- a/dotnet/Qpid.Client/Client/AmqChannel.cs +++ b/dotnet/Qpid.Client/Client/AmqChannel.cs @@ -28,6 +28,7 @@ using Qpid.Client.Message; using Qpid.Collections; using Qpid.Framing; using Qpid.Messaging; +using Qpid.Protocol; namespace Qpid.Client { @@ -568,8 +569,14 @@ namespace Qpid.Client if (_logger.IsDebugEnabled) { _logger.Debug("Message received in session with channel id " + _channelId); - } - _queue.EnqueueBlocking(message); + } + if ( message.DeliverBody == null ) + { + ReturnBouncedMessage(message); + } else + { + _queue.EnqueueBlocking(message); + } } public int DefaultPrefetch @@ -986,5 +993,42 @@ namespace Qpid.Client // FIXME: lock FailoverMutex here? _connection.ProtocolWriter.Write(ackFrame); } + + /// <summary> + /// Handle a message that bounced from the server, creating + /// the corresponding exception and notifying the connection about it + /// </summary> + /// <param name="message">Unprocessed message</param> + private void ReturnBouncedMessage(UnprocessedMessage message) + { + try + { + AbstractQmsMessage bouncedMessage = + _messageFactoryRegistry.CreateMessage( + 0, false, message.ContentHeader, + message.Bodies + ); + + int errorCode = message.BounceBody.ReplyCode; + string reason = message.BounceBody.ReplyText; + _logger.Debug("Message returned with error code " + errorCode + " (" + reason + ")"); + AMQException exception; + if ( errorCode == AMQConstant.NO_CONSUMERS.Code ) + { + exception = new AMQNoConsumersException(reason, bouncedMessage); + } else if ( errorCode == AMQConstant.NO_ROUTE.Code ) + { + exception = new AMQNoRouteException(reason, bouncedMessage); + } else + { + exception = new AMQUndeliveredException(errorCode, reason, bouncedMessage); + } + _connection.ExceptionReceived(exception); + } catch ( Exception ex ) + { + _logger.Error("Caught exception trying to raise undelivered message exception (dump follows) - ignoring...", ex); + } + + } } } diff --git a/dotnet/Qpid.Client/Client/Failover/FailoverHandler.cs b/dotnet/Qpid.Client/Client/Failover/FailoverHandler.cs index aa79749b41..dbd09da49c 100644 --- a/dotnet/Qpid.Client/Client/Failover/FailoverHandler.cs +++ b/dotnet/Qpid.Client/Client/Failover/FailoverHandler.cs @@ -97,7 +97,8 @@ namespace Qpid.Client.Failover // if _host has value then we are performing a redirect. if (_host != null) { - failoverSucceeded = _connection.AttemptReconnection(_host, _port, false); + // todo: fix SSL support! + failoverSucceeded = _connection.AttemptReconnection(_host, _port, null); } else { diff --git a/dotnet/Qpid.Client/Client/Handler/BasicReturnMethodHandler.cs b/dotnet/Qpid.Client/Client/Handler/BasicReturnMethodHandler.cs index 78526f906f..0bd65a1ace 100644 --- a/dotnet/Qpid.Client/Client/Handler/BasicReturnMethodHandler.cs +++ b/dotnet/Qpid.Client/Client/Handler/BasicReturnMethodHandler.cs @@ -32,7 +32,7 @@ namespace Qpid.Client.Handler public void MethodReceived(AMQStateManager stateManager, AMQMethodEvent evt) { - _logger.Debug("New JmsBounce method received"); + _logger.Debug("New Basic.Return method received"); UnprocessedMessage msg = new UnprocessedMessage(); msg.DeliverBody = null; msg.BounceBody = (BasicReturnBody) evt.Method; diff --git a/dotnet/Qpid.Client/Client/Handler/ChannelCloseMethodHandler.cs b/dotnet/Qpid.Client/Client/Handler/ChannelCloseMethodHandler.cs index 0ce8a393c9..7f88dd8219 100644 --- a/dotnet/Qpid.Client/Client/Handler/ChannelCloseMethodHandler.cs +++ b/dotnet/Qpid.Client/Client/Handler/ChannelCloseMethodHandler.cs @@ -44,11 +44,20 @@ namespace Qpid.Client.Handler AMQFrame frame = ChannelCloseOkBody.CreateAMQFrame(evt.ChannelId); evt.ProtocolSession.WriteFrame(frame); - // HACK + if ( errorCode != AMQConstant.REPLY_SUCCESS.Code ) { - _logger.Debug("Channel close received with errorCode " + errorCode + ", throwing exception"); - evt.ProtocolSession.AMQConnection.ExceptionReceived(new AMQChannelClosedException(errorCode, "Error: " + reason)); + _logger.Debug("Channel close received with errorCode " + errorCode + ", throwing exception"); + if ( errorCode == AMQConstant.NO_CONSUMERS.Code ) + throw new AMQNoConsumersException(reason); + if ( errorCode == AMQConstant.NO_ROUTE.Code ) + throw new AMQNoRouteException(reason); + if ( errorCode == AMQConstant.INVALID_ARGUMENT.Code ) + throw new AMQInvalidArgumentException(reason); + if ( errorCode == AMQConstant.INVALID_ROUTING_KEY.Code ) + throw new AMQInvalidRoutingKeyException(reason); + // any other + throw new AMQChannelClosedException(errorCode, "Error: " + reason); } evt.ProtocolSession.ChannelClosed(evt.ChannelId, errorCode, reason); } diff --git a/dotnet/Qpid.Client/Client/Protocol/AMQProtocolSession.cs b/dotnet/Qpid.Client/Client/Protocol/AMQProtocolSession.cs index 42169f31b3..0ca443e3bb 100644 --- a/dotnet/Qpid.Client/Client/Protocol/AMQProtocolSession.cs +++ b/dotnet/Qpid.Client/Client/Protocol/AMQProtocolSession.cs @@ -271,7 +271,7 @@ namespace Qpid.Client.Protocol id = _queueId++; } - return "tmp_" + _connection.Transport.getLocalEndPoint() + "_" + id; + return "tmp_" + _connection.Transport.LocalEndpoint + "_" + id; } } } diff --git a/dotnet/Qpid.Client/Client/Security/CallbackHandlerRegistry.cs b/dotnet/Qpid.Client/Client/Security/CallbackHandlerRegistry.cs index 78f13c9f42..944d21ad92 100644 --- a/dotnet/Qpid.Client/Client/Security/CallbackHandlerRegistry.cs +++ b/dotnet/Qpid.Client/Client/Security/CallbackHandlerRegistry.cs @@ -92,6 +92,8 @@ namespace Qpid.Client.Security if ( _mechanism2HandlerMap == null )
_mechanism2HandlerMap = new Hashtable();
+ if ( !_mechanism2HandlerMap.Contains(ExternalSaslClient.Mechanism) )
+ _mechanism2HandlerMap.Add(ExternalSaslClient.Mechanism, typeof(UsernamePasswordCallbackHandler));
if ( !_mechanism2HandlerMap.Contains(CramMD5SaslClient.Mechanism) )
_mechanism2HandlerMap.Add(CramMD5SaslClient.Mechanism, typeof(UsernamePasswordCallbackHandler));
if ( !_mechanism2HandlerMap.Contains(PlainSaslClient.Mechanism) )
diff --git a/dotnet/Qpid.Client/Client/SslOptions.cs b/dotnet/Qpid.Client/Client/SslOptions.cs new file mode 100644 index 0000000000..a6488d99ea --- /dev/null +++ b/dotnet/Qpid.Client/Client/SslOptions.cs @@ -0,0 +1,81 @@ +/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.Security.Cryptography.X509Certificates;
+
+namespace Qpid.Client
+{
+ /// <summary>
+ /// Configures SSL-related options to connect to an AMQP broker.
+ /// </summary>
+ /// <remarks>
+ /// If the server certificate is not trusted by the client,
+ /// connection will fail. However, you can set the
+ /// <see cref="IgnoreValidationErrors"/> property to true
+ /// to ignore any certificate verification errors for debugging purposes.
+ /// </remarks>
+ public class SslOptions
+ {
+ private X509Certificate _clientCertificate;
+ private bool _ignoreValidationErrors;
+
+ /// <summary>
+ /// Certificate to present to the broker to authenticate
+ /// this client connection
+ /// </summary>
+ public X509Certificate ClientCertificate
+ {
+ get { return _clientCertificate; }
+ }
+
+ /// <summary>
+ /// If true, the validity of the broker certificate
+ /// will not be verified on connection
+ /// </summary>
+ public bool IgnoreValidationErrors
+ {
+ get { return _ignoreValidationErrors; }
+ }
+
+ /// <summary>
+ /// Initialize a new instance with default values
+ /// (No client certificate, don't ignore validation errors)
+ /// </summary>
+ public SslOptions()
+ {
+ }
+
+ /// <summary>
+ /// Initialize a new instance
+ /// </summary>
+ /// <param name="clientCertificate">
+ /// Certificate to use to authenticate the client to the broker
+ /// </param>
+ /// <param name="ignoreValidationErrors">
+ /// If true, ignore any validation errors when validating the server certificate
+ /// </param>
+ public SslOptions(X509Certificate clientCertificate, bool ignoreValidationErrors)
+ {
+ _clientCertificate = clientCertificate;
+ _ignoreValidationErrors = ignoreValidationErrors;
+ }
+ }
+}
diff --git a/dotnet/Qpid.Client/Client/Transport/AmqpChannel.cs b/dotnet/Qpid.Client/Client/Transport/AmqpChannel.cs index d3add546fe..ca7ffad8b3 100644 --- a/dotnet/Qpid.Client/Client/Transport/AmqpChannel.cs +++ b/dotnet/Qpid.Client/Client/Transport/AmqpChannel.cs @@ -50,19 +50,18 @@ namespace Qpid.Client.Transport public Queue Read() { ByteBuffer buffer = byteChannel.Read(); + return DecodeAndTrace(buffer); + } + + public IAsyncResult BeginRead(AsyncCallback callback, object state) + { + return byteChannel.BeginRead(callback, state); + } - Queue frames = Decode(buffer); - - // TODO: Refactor to decorator. - if (_protocolTraceLog.IsDebugEnabled) - { - foreach (object o in frames) - { - _protocolTraceLog.Debug(String.Format("READ {0}", o)); - } - } - - return frames; + public Queue EndRead(IAsyncResult result) + { + ByteBuffer buffer = byteChannel.EndRead(result); + return DecodeAndTrace(buffer); } public void Write(IDataBlock o) @@ -72,10 +71,33 @@ namespace Qpid.Client.Transport { _protocolTraceLog.Debug(String.Format("WRITE {0}", o)); } - + // we should be doing an async write, but apparently + // the mentalis library doesn't queue async read/writes + // correctly and throws random IOException's. Stay sync for a while + //byteChannel.BeginWrite(Encode(o), OnAsyncWriteDone, null); byteChannel.Write(Encode(o)); } + private void OnAsyncWriteDone(IAsyncResult result) + { + byteChannel.EndWrite(result); + } + + private Queue DecodeAndTrace(ByteBuffer buffer) + { + Queue frames = Decode(buffer); + + // TODO: Refactor to decorator. + if ( _protocolTraceLog.IsDebugEnabled ) + { + foreach ( object o in frames ) + { + _protocolTraceLog.Debug(String.Format("READ {0}", o)); + } + } + return frames; + } + private ByteBuffer Encode(object o) { SingleProtocolEncoderOutput output = new SingleProtocolEncoderOutput(); diff --git a/dotnet/Qpid.Client/Client/Transport/IByteChannel.cs b/dotnet/Qpid.Client/Client/Transport/IByteChannel.cs index 4a3ff385a8..0f8f341d48 100644 --- a/dotnet/Qpid.Client/Client/Transport/IByteChannel.cs +++ b/dotnet/Qpid.Client/Client/Transport/IByteChannel.cs @@ -18,13 +18,54 @@ * under the License. * */ +using System; using Qpid.Buffer; namespace Qpid.Client.Transport { - public interface IByteChannel - { - ByteBuffer Read(); - void Write(ByteBuffer buffer); - } + /// <summary> + /// Represents input/output channels that read + /// and write <see cref="ByteBuffer"/> instances + /// </summary> + public interface IByteChannel + { + /// <summary> + /// Read a <see cref="ByteBuffer"/> from the underlying + /// network stream and any configured filters + /// </summary> + /// <returns>A ByteBuffer, if available</returns> + ByteBuffer Read(); + /// <summary> + /// Begin an asynchronous read operation + /// </summary> + /// <param name="callback">Callback method to call when read operation completes</param> + /// <param name="state">State object</param> + /// <returns>An <see cref="System.IAsyncResult"/> object</returns> + IAsyncResult BeginRead(AsyncCallback callback, object state); + /// <summary> + /// End an asynchronous read operation + /// </summary> + /// <param name="result">The <see cref="System.IAsyncResult"/> object returned from <see cref="BeginRead"/></param> + /// <returns>The <see cref="ByteBuffer"/> read</returns> + ByteBuffer EndRead(IAsyncResult result); + /// <summary> + /// Write a <see cref="ByteBuffer"/> to the underlying network + /// stream, going through any configured filters + /// </summary> + /// <param name="buffer"></param> + void Write(ByteBuffer buffer); + /// <summary> + /// Begin an asynchronous write operation + /// </summary> + /// <param name="buffer">Buffer to write</param> + /// <param name="callback">A callback to call when the operation completes</param> + /// <param name="state">State object</param> + /// <returns>An <see cref="System.IAsyncResult"/> object</returns> + IAsyncResult BeginWrite(ByteBuffer buffer, AsyncCallback callback, object state); + /// <summary> + /// End an asynchronous write operation + /// </summary> + /// <param name="result">The <see cref="System.IAsyncResult"/> object returned by <see cref="BeginWrite"/></param> + void EndWrite(IAsyncResult result); + } } diff --git a/dotnet/Qpid.Client/Client/Transport/IProtocolChannel.cs b/dotnet/Qpid.Client/Client/Transport/IProtocolChannel.cs index 4943a45d68..0379e582d6 100644 --- a/dotnet/Qpid.Client/Client/Transport/IProtocolChannel.cs +++ b/dotnet/Qpid.Client/Client/Transport/IProtocolChannel.cs @@ -18,6 +18,7 @@ * under the License. * */ +using System; using System.Collections; namespace Qpid.Client.Transport @@ -25,5 +26,7 @@ namespace Qpid.Client.Transport public interface IProtocolChannel : IProtocolWriter { Queue Read(); + IAsyncResult BeginRead(AsyncCallback callback, object state); + Queue EndRead(IAsyncResult result); } } diff --git a/dotnet/Qpid.Client/Client/Transport/IStreamFilter.cs b/dotnet/Qpid.Client/Client/Transport/IStreamFilter.cs new file mode 100644 index 0000000000..409b428c01 --- /dev/null +++ b/dotnet/Qpid.Client/Client/Transport/IStreamFilter.cs @@ -0,0 +1,38 @@ +/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System.IO;
+
+namespace Qpid.Client.Transport
+{
+ /// <summary>
+ /// Defines a way to introduce an arbitrary filtering
+ /// stream into the stream chain managed by <see cref="IoHandler"/>
+ /// </summary>
+ public interface IStreamFilter
+ {
+ /// <summary>
+ /// Creates a new filtering stream on top of another
+ /// </summary>
+ /// <param name="lowerStream">Next stream on the stack</param>
+ /// <returns>A new filtering stream</returns>
+ Stream CreateFilterStream(Stream lowerStream);
+ }
+}
diff --git a/dotnet/Qpid.Client/Client/Transport/ITransport.cs b/dotnet/Qpid.Client/Client/Transport/ITransport.cs index aebe58b439..3d918693bc 100644 --- a/dotnet/Qpid.Client/Client/Transport/ITransport.cs +++ b/dotnet/Qpid.Client/Client/Transport/ITransport.cs @@ -18,14 +18,15 @@ * under the License. * */ +using Qpid.Client.Qms; using Qpid.Client.Protocol; namespace Qpid.Client.Transport { public interface ITransport : IConnectionCloser { - void Open(); - string getLocalEndPoint(); + void Connect(IBrokerInfo broker, AMQConnection connection); + string LocalEndpoint { get; } IProtocolWriter ProtocolWriter { get; } } } diff --git a/dotnet/Qpid.Client/Client/Transport/IoHandler.cs b/dotnet/Qpid.Client/Client/Transport/IoHandler.cs new file mode 100644 index 0000000000..8d1f04f662 --- /dev/null +++ b/dotnet/Qpid.Client/Client/Transport/IoHandler.cs @@ -0,0 +1,321 @@ +/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.IO;
+using System.Threading;
+using log4net;
+using Qpid.Buffer;
+using Qpid.Client.Protocol;
+
+namespace Qpid.Client.Transport
+{
+ /// <summary>
+ /// Responsible for reading and writing
+ /// ByteBuffers from/to network streams, and handling
+ /// the stream filters
+ /// </summary>
+ public class IoHandler : IByteChannel, IDisposable
+ {
+ private static readonly ILog _log = LogManager.GetLogger(typeof(IoHandler));
+ private const int DEFAULT_BUFFER_SIZE = 32 * 1024;
+
+ private Stream _topStream;
+ private IProtocolListener _protocolListener;
+ private int _readBufferSize;
+
+ public int ReadBufferSize
+ {
+ get { return _readBufferSize; }
+ set { _readBufferSize = value; }
+ }
+
+ /// <summary>
+ /// Initialize a new instance
+ /// </summary>
+ /// <param name="stream">Underlying network stream</param>
+ /// <param name="protocolListener">Protocol listener to report exceptions to</param>
+ public IoHandler(Stream stream, IProtocolListener protocolListener)
+ {
+ if ( stream == null )
+ throw new ArgumentNullException("stream");
+ if ( protocolListener == null )
+ throw new ArgumentNullException("protocolListener");
+
+ // initially, the stream at the top of the filter
+ // chain is the underlying network stream
+ _topStream = stream;
+ _protocolListener = protocolListener;
+ _readBufferSize = DEFAULT_BUFFER_SIZE;
+ }
+
+ /// <summary>
+ /// Adds a new filter on the top of the chain
+ /// </summary>
+ /// <param name="filter">Stream filter to put on top of the chain</param>
+ /// <remarks>
+ /// This should *only* be called during initialization. We don't
+ /// support changing the filter change after the first read/write
+ /// has been done and it's not thread-safe to boot!
+ /// </remarks>
+ public void AddFilter(IStreamFilter filter)
+ {
+ _topStream = filter.CreateFilterStream(_topStream);
+ }
+
+ #region IByteChannel Implementation
+ //
+ // IByteChannel Implementation
+ //
+
+ /// <summary>
+ /// Read a <see cref="ByteBuffer"/> from the underlying
+ /// network stream and any configured filters
+ /// </summary>
+ /// <returns>A ByteBuffer, if available</returns>
+ public ByteBuffer Read()
+ {
+ byte[] bytes = AllocateBuffer();
+
+ int numOctets = _topStream.Read(bytes, 0, bytes.Length);
+
+ return WrapByteArray(bytes, numOctets);
+ }
+
+ /// <summary>
+ /// Begin an asynchronous read operation
+ /// </summary>
+ /// <param name="callback">Callback method to call when read operation completes</param>
+ /// <param name="state">State object</param>
+ /// <returns>An <see cref="System.IAsyncResult"/> object</returns>
+ public IAsyncResult BeginRead(AsyncCallback callback, object state)
+ {
+ byte[] bytes = AllocateBuffer();
+ ReadData rd = new ReadData(callback, state, bytes);
+
+ // only put a callback if the caller wants one.
+ AsyncCallback myCallback = null;
+ if ( callback != null )
+ myCallback = new AsyncCallback(OnAsyncReadDone);
+
+ IAsyncResult result = _topStream.BeginRead(
+ bytes, 0, bytes.Length, myCallback,rd
+ );
+ return new WrappedAsyncResult(result, bytes);
+ }
+
+ /// <summary>
+ /// End an asynchronous read operation
+ /// </summary>
+ /// <param name="result">The <see cref="System.IAsyncResult"/> object returned from <see cref="BeginRead"/></param>
+ /// <returns>The <see cref="ByteBuffer"/> read</returns>
+ public ByteBuffer EndRead(IAsyncResult result)
+ {
+ WrappedAsyncResult theResult = (WrappedAsyncResult)result;
+ int bytesRead = _topStream.EndRead(theResult.InnerResult);
+ return WrapByteArray(theResult.Buffer, bytesRead);
+ }
+
+ /// <summary>
+ /// Write a <see cref="ByteBuffer"/> to the underlying network
+ /// stream, going through any configured filters
+ /// </summary>
+ /// <param name="buffer"></param>
+ public void Write(ByteBuffer buffer)
+ {
+ try
+ {
+ _topStream.Write(buffer.Array, buffer.Position, buffer.Limit); // FIXME
+ } catch ( Exception e )
+ {
+ _log.Error("Write caused exception", e);
+ _protocolListener.OnException(e);
+ }
+ }
+
+ /// <summary>
+ /// Begin an asynchronous write operation
+ /// </summary>
+ /// <param name="buffer">Buffer to write</param>
+ /// <param name="callback">A callback to call when the operation completes</param>
+ /// <param name="state">State object</param>
+ /// <returns>An <see cref="System.IAsyncResult"/> object</returns>
+ public IAsyncResult BeginWrite(ByteBuffer buffer, AsyncCallback callback, object state)
+ {
+ try
+ {
+ return _topStream.BeginWrite(
+ buffer.Array, buffer.Position, buffer.Limit,
+ callback, state
+ );
+ } catch ( Exception e )
+ {
+ _log.Error("BeginWrite caused exception", e);
+ // not clear if an exception here should be propagated? we still
+ // need to propagate it upwards anyway!
+ _protocolListener.OnException(e);
+ throw;
+ }
+ }
+
+ /// <summary>
+ /// End an asynchronous write operation
+ /// </summary>
+ /// <param name="result">The <see cref="System.IAsyncResult"/> object returned by <see cref="BeginWrite"/></param>
+ public void EndWrite(IAsyncResult result)
+ {
+ try
+ {
+ _topStream.EndWrite(result);
+ } catch ( Exception e )
+ {
+ _log.Error("EndWrite caused exception", e);
+ // not clear if an exception here should be propagated?
+ _protocolListener.OnException(e);
+ //throw;
+ }
+ }
+ #endregion // IByteChannel Implementation
+
+ #region IDisposable Implementation
+ //
+ // IDisposable Implementation
+ //
+
+ public void Dispose()
+ {
+ if ( _topStream != null )
+ {
+ _topStream.Close();
+ }
+ }
+
+ #endregion // IDisposable Implementation
+
+ #region Private and Helper Classes/Methods
+ //
+ // Private and Helper Classes/Methods
+ //
+
+ private byte[] AllocateBuffer()
+ {
+ return new byte[ReadBufferSize];
+ }
+
+ private static ByteBuffer WrapByteArray(byte[] bytes, int size)
+ {
+ ByteBuffer byteBuffer = ByteBuffer.Wrap(bytes);
+ byteBuffer.Limit = size;
+ byteBuffer.Flip();
+
+ return byteBuffer;
+ }
+
+
+ private static void OnAsyncReadDone(IAsyncResult result)
+ {
+ ReadData rd = (ReadData) result.AsyncState;
+ IAsyncResult wrapped = new WrappedAsyncResult(result, rd.Buffer);
+ rd.Callback(wrapped);
+ }
+
+ class ReadData
+ {
+ private object _state;
+ private AsyncCallback _callback;
+ private byte[] _buffer;
+
+ public object State
+ {
+ get { return _state; }
+ }
+
+ public AsyncCallback Callback
+ {
+ get { return _callback; }
+ }
+
+ public byte[] Buffer
+ {
+ get { return _buffer; }
+ }
+
+ public ReadData(AsyncCallback callback, object state, byte[] buffer)
+ {
+ _callback = callback;
+ _state = state;
+ _buffer = buffer;
+ }
+ }
+
+ class WrappedAsyncResult : IAsyncResult
+ {
+ private IAsyncResult _innerResult;
+ private byte[] _buffer;
+
+ #region IAsyncResult Properties
+ //
+ // IAsyncResult Properties
+ //
+ public bool IsCompleted
+ {
+ get { return _innerResult.IsCompleted; }
+ }
+
+ public WaitHandle AsyncWaitHandle
+ {
+ get { return _innerResult.AsyncWaitHandle; }
+ }
+
+ public object AsyncState
+ {
+ get { return _innerResult.AsyncState; }
+ }
+
+ public bool CompletedSynchronously
+ {
+ get { return _innerResult.CompletedSynchronously; }
+ }
+ #endregion // IAsyncResult Properties
+
+ public IAsyncResult InnerResult
+ {
+ get { return _innerResult; }
+ }
+ public byte[] Buffer
+ {
+ get { return _buffer; }
+ }
+
+ public WrappedAsyncResult(IAsyncResult result, byte[] buffer)
+ {
+ if ( result == null )
+ throw new ArgumentNullException("result");
+ if ( buffer == null )
+ throw new ArgumentNullException("buffer");
+
+ _innerResult = result;
+ _buffer = buffer;
+ }
+ }
+
+ #endregion // Private and Helper Classes/Methods
+ }
+}
diff --git a/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/BlockingSocketProcessor.cs b/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/BlockingSocketProcessor.cs index 7a9ead0c06..e69de29bb2 100644 --- a/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/BlockingSocketProcessor.cs +++ b/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/BlockingSocketProcessor.cs @@ -1,117 +0,0 @@ -/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-using System;
-using System.Net;
-using System.Net.Sockets;
-using log4net;
-using Qpid.Buffer;
-using Qpid.Client.Protocol;
-
-namespace Qpid.Client.Transport.Socket.Blocking
-{
- class BlockingSocketProcessor : IConnectionCloser
- {
- private static readonly ILog _log = LogManager.GetLogger(typeof(BlockingSocketProcessor));
-
- string _host;
- int _port;
- System.Net.Sockets.Socket _socket;
- private NetworkStream _networkStream;
- IByteChannel _byteChannel;
- IProtocolListener _protocolListener;
-
- public BlockingSocketProcessor(string host, int port, IProtocolListener protocolListener)
- {
- _host = host;
- _port = port;
- _protocolListener = protocolListener;
- _byteChannel = new ByteChannel(this);
- }
-
- /// <summary>
- /// Synchronous blocking connect.
- /// </summary>
- public void Connect()
- {
- _socket = new System.Net.Sockets.Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
-
- IPHostEntry ipHostInfo = Dns.Resolve(_host); // Note: don't fix this warning. We do this for .NET 1.1 compatibility.
- IPAddress ipAddress = ipHostInfo.AddressList[0];
-
- IPEndPoint ipe = new IPEndPoint(ipAddress, _port);
-
- _socket.Connect(ipe);
- _networkStream = new NetworkStream(_socket, true);
- }
-
- public string getLocalEndPoint()
- {
- return _socket.LocalEndPoint.ToString();
- }
-
- public void Write(ByteBuffer byteBuffer)
- {
- try
- {
- _networkStream.Write(byteBuffer.Array, byteBuffer.Position, byteBuffer.Limit); // FIXME
- }
- catch (Exception e)
- {
- _log.Error("Write caused exception", e);
- _protocolListener.OnException(e);
- }
- }
-
- public ByteBuffer Read()
- {
- const int bufferSize = 4 * 1024; // TODO: Prevent constant allocation of buffers.
- byte[] bytes = new byte[bufferSize];
-
- int numOctets = _networkStream.Read(bytes, 0, bytes.Length);
-
- ByteBuffer byteBuffer = ByteBuffer.Wrap(bytes);
- byteBuffer.Limit = numOctets;
-
- byteBuffer.Flip();
-
- return byteBuffer;
- }
-
- public void Disconnect()
- {
- _networkStream.Flush();
- _networkStream.Close();
- _socket.Close();
- }
-
- public void Close()
- {
- Disconnect();
- }
-
- public IByteChannel ByteChannel
- {
- get { return _byteChannel; }
- }
- }
-}
-
-
diff --git a/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/BlockingSocketTransport.cs b/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/BlockingSocketTransport.cs index e18eefd493..607b7ca422 100644 --- a/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/BlockingSocketTransport.cs +++ b/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/BlockingSocketTransport.cs @@ -20,101 +20,119 @@ */
using System;
using System.Collections;
+using System.IO;
using System.Threading;
-using log4net;
+using Qpid.Client.Qms;
using Qpid.Client.Protocol;
using Qpid.Framing;
namespace Qpid.Client.Transport.Socket.Blocking
{
- public class BlockingSocketTransport : ITransport
- {
-// static readonly ILog _log = LogManager.GetLogger(typeof(BlockingSocketTransport));
+ /// <summary>
+ /// TCP Socket transport supporting both
+ /// SSL and non-SSL connections.
+ /// </summary>
+ public class BlockingSocketTransport : ITransport
+ {
+ // Configuration variables.
+ IProtocolListener _protocolListener;
- // Configuration variables.
- string _host;
- int _port;
- IProtocolListener _protocolListener;
+ // Runtime variables.
+ private ISocketConnector _connector;
+ private IoHandler _ioHandler;
+ private AmqpChannel _amqpChannel;
+ private ManualResetEvent _stopEvent;
- // Runtime variables.
- private BlockingSocketProcessor _socketProcessor;
- private AmqpChannel _amqpChannel;
-
- private ReaderRunner _readerRunner;
- private Thread _readerThread;
+ public IProtocolWriter ProtocolWriter
+ {
+ get { return _amqpChannel; }
+ }
+ public string LocalEndpoint
+ {
+ get { return _connector.LocalEndpoint; }
+ }
- public BlockingSocketTransport(string host, int port, AMQConnection connection)
- {
- _host = host;
- _port = port;
- _protocolListener = connection.ProtocolListener;
- }
-
- public void Open()
- {
- _socketProcessor = new BlockingSocketProcessor(_host, _port, _protocolListener);
- _socketProcessor.Connect();
- _amqpChannel = new AmqpChannel(_socketProcessor.ByteChannel);
- _readerRunner = new ReaderRunner(this);
- _readerThread = new Thread(new ThreadStart(_readerRunner.Run));
- _readerThread.Start();
- }
+
+ /// <summary>
+ /// Connect to the specified broker
+ /// </summary>
+ /// <param name="broker">The broker to connect to</param>
+ /// <param name="connection">The AMQ connection</param>
+ public void Connect(IBrokerInfo broker, AMQConnection connection)
+ {
+ _stopEvent = new ManualResetEvent(false);
+ _protocolListener = connection.ProtocolListener;
- public string getLocalEndPoint()
- {
- return _socketProcessor.getLocalEndPoint();
- }
+ _ioHandler = MakeBrokerConnection(broker, connection);
+ // todo: get default read size from config!
- public void Close()
- {
- StopReaderThread();
- _socketProcessor.Disconnect();
- }
+ _amqpChannel = new AmqpChannel(new ByteChannel(_ioHandler));
+ // post an initial async read
+ _amqpChannel.BeginRead(new AsyncCallback(OnAsyncReadDone), this);
+ }
- public IProtocolChannel ProtocolChannel { get { return _amqpChannel; } }
- public IProtocolWriter ProtocolWriter { get { return _amqpChannel; } }
+ /// <summary>
+ /// Close the broker connection
+ /// </summary>
+ public void Close()
+ {
+ StopReading();
+ CloseBrokerConnection();
+ }
- public void StopReaderThread()
- {
- _readerRunner.Stop();
- }
+ private void StopReading()
+ {
+ _stopEvent.Set();
+ }
- class ReaderRunner
- {
- BlockingSocketTransport _transport;
- bool _running = true;
+ private void CloseBrokerConnection()
+ {
+ if ( _ioHandler != null )
+ {
+ _ioHandler.Dispose();
+ _ioHandler = null;
+ }
+ if ( _connector != null )
+ {
+ _connector.Dispose();
+ _connector = null;
+ }
+ }
- public ReaderRunner(BlockingSocketTransport transport)
- {
- _transport = transport;
- }
+ private IoHandler MakeBrokerConnection(IBrokerInfo broker, AMQConnection connection)
+ {
+ if ( broker.UseSSL )
+ {
+ _connector = new SslSocketConnector();
+ } else
+ {
+ _connector = new SocketConnector();
+ }
- public void Run()
- {
- try
- {
- while (_running)
- {
- Queue frames = _transport.ProtocolChannel.Read();
+ Stream stream = _connector.Connect(broker);
+ return new IoHandler(stream, connection.ProtocolListener);
+ }
- foreach (IDataBlock dataBlock in frames)
- {
- _transport._protocolListener.OnMessage(dataBlock);
- }
- }
- }
- catch (Exception e)
- {
- _transport._protocolListener.OnException(e);
- }
- }
+ private void OnAsyncReadDone(IAsyncResult result)
+ {
+ try
+ {
+ Queue frames = _amqpChannel.EndRead(result);
+ // if we're not stopping, post a read again
+ bool stopping = _stopEvent.WaitOne(0, false);
+ if ( !stopping )
+ _amqpChannel.BeginRead(new AsyncCallback(OnAsyncReadDone), null);
- public void Stop()
+ // process results
+ foreach ( IDataBlock dataBlock in frames )
{
- // TODO: Check if this is thread safe. running is not volitile....
- _running = false;
+ _protocolListener.OnMessage(dataBlock);
}
- }
- }
+ } catch ( Exception e )
+ {
+ _protocolListener.OnException(e);
+ }
+ }
+ }
}
diff --git a/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/ByteChannel.cs b/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/ByteChannel.cs index a520815f84..ff2c301a91 100644 --- a/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/ByteChannel.cs +++ b/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/ByteChannel.cs @@ -29,16 +29,16 @@ namespace Qpid.Client.Transport.Socket.Blocking // Warning: don't use this log for regular logging.
private static readonly ILog _ioTraceLog = LogManager.GetLogger("Qpid.Client.ByteChannel.Tracing");
- BlockingSocketProcessor processor;
+ private IByteChannel _lowerChannel;
- public ByteChannel(BlockingSocketProcessor processor)
+ public ByteChannel(IByteChannel lowerChannel)
{
- this.processor = processor;
+ _lowerChannel = lowerChannel;
}
public ByteBuffer Read()
{
- ByteBuffer result = processor.Read();
+ ByteBuffer result = _lowerChannel.Read();
// TODO: Move into decorator.
if (_ioTraceLog.IsDebugEnabled)
@@ -49,6 +49,21 @@ namespace Qpid.Client.Transport.Socket.Blocking return result;
}
+ public IAsyncResult BeginRead(AsyncCallback callback, object state)
+ {
+ return _lowerChannel.BeginRead(callback, state);
+ }
+
+ public ByteBuffer EndRead(IAsyncResult result)
+ {
+ ByteBuffer buffer = _lowerChannel.EndRead(result);
+ if ( _ioTraceLog.IsDebugEnabled )
+ {
+ _ioTraceLog.Debug(String.Format("READ {0}", buffer));
+ }
+ return buffer;
+ }
+
public void Write(ByteBuffer buffer)
{
// TODO: Move into decorator.
@@ -56,8 +71,22 @@ namespace Qpid.Client.Transport.Socket.Blocking {
_ioTraceLog.Debug(String.Format("WRITE {0}", buffer));
}
-
- processor.Write(buffer);
+
+ _lowerChannel.Write(buffer);
+ }
+
+ public IAsyncResult BeginWrite(ByteBuffer buffer, AsyncCallback callback, object state)
+ {
+ if ( _ioTraceLog.IsDebugEnabled )
+ {
+ _ioTraceLog.Debug(String.Format("WRITE {0}", buffer));
+ }
+ return _lowerChannel.BeginWrite(buffer, callback, state);
+ }
+
+ public void EndWrite(IAsyncResult result)
+ {
+ _lowerChannel.EndWrite(result);
}
- }
+ }
}
\ No newline at end of file diff --git a/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/ISocketConnector.cs b/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/ISocketConnector.cs new file mode 100644 index 0000000000..ac0dc37a16 --- /dev/null +++ b/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/ISocketConnector.cs @@ -0,0 +1,33 @@ +/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.IO;
+using Qpid.Client.Qms;
+
+namespace Qpid.Client.Transport.Socket.Blocking
+{
+ interface ISocketConnector : IDisposable
+ {
+ string LocalEndpoint { get; }
+ Stream Connect(IBrokerInfo broker);
+ }
+}
+
diff --git a/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/SocketConnector.cs b/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/SocketConnector.cs new file mode 100644 index 0000000000..a651413581 --- /dev/null +++ b/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/SocketConnector.cs @@ -0,0 +1,70 @@ +/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System.IO;
+using System.Net;
+using System.Net.Sockets;
+using Qpid.Client.Qms;
+
+namespace Qpid.Client.Transport.Socket.Blocking
+{
+ /// <summary>
+ /// Implements a TCP connection over regular sockets.
+ /// </summary>
+ class SocketConnector : ISocketConnector
+ {
+ private MyTcpClient _tcpClient;
+
+ public string LocalEndpoint
+ {
+ get { return _tcpClient.LocalEndpoint.ToString(); }
+ }
+
+ public Stream Connect(IBrokerInfo broker)
+ {
+ _tcpClient = new MyTcpClient(broker.Host, broker.Port);
+ return _tcpClient.GetStream();
+ }
+
+ public void Dispose()
+ {
+ if ( _tcpClient != null )
+ {
+ _tcpClient.Close();
+ _tcpClient = null;
+ }
+ }
+
+ class MyTcpClient : TcpClient
+ {
+ public MyTcpClient(string host, int port)
+ : base(host, port)
+ {
+ }
+
+ public EndPoint LocalEndpoint
+ {
+ get { return Client.LocalEndPoint; }
+ }
+ }
+
+ }
+}
+
diff --git a/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/SslSocketConnector.cs b/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/SslSocketConnector.cs new file mode 100644 index 0000000000..24c3f5bcb8 --- /dev/null +++ b/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/SslSocketConnector.cs @@ -0,0 +1,107 @@ +/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System.IO;
+using System.Net;
+using log4net;
+using Qpid.Client.Qms;
+using Org.Mentalis.Security.Ssl;
+using MCertificate = Org.Mentalis.Security.Certificates.Certificate;
+using MCertificateChain = Org.Mentalis.Security.Certificates.CertificateChain;
+
+namespace Qpid.Client.Transport.Socket.Blocking
+{
+ /// <summary>
+ /// Implements a TLS v1.0 connection using the Mentalis.org library
+ /// </summary>
+ /// <remarks>
+ /// It would've been easier to implement this at the StreamFilter
+ /// level, but unfortunately the Mentalis library doesn't support
+ /// a passthrough SSL stream class and is tied directly
+ /// to socket-like classes.
+ /// </remarks>
+ class SslSocketConnector : ISocketConnector
+ {
+ private static ILog _logger = LogManager.GetLogger(typeof(SslSocketConnector));
+ private MyTcpClient _tcpClient;
+
+ public string LocalEndpoint
+ {
+ get { return _tcpClient.LocalEndpoint.ToString(); }
+ }
+
+ public Stream Connect(IBrokerInfo broker)
+ {
+ MCertificate cert = GetClientCert(broker);
+ SecurityOptions options = new SecurityOptions(
+ SecureProtocol.Tls1, cert, ConnectionEnd.Client
+ );
+ if ( broker.SslOptions != null
+ && broker.SslOptions.IgnoreValidationErrors )
+ {
+ _logger.Warn("Ignoring any certificate validation errors during SSL handshake...");
+ options.VerificationType = CredentialVerification.None;
+ }
+
+ _tcpClient = new MyTcpClient(broker.Host, broker.Port, options);
+ return _tcpClient.GetStream();
+ }
+
+ public void Dispose()
+ {
+ if ( _tcpClient != null )
+ {
+ _tcpClient.Close();
+ _tcpClient = null;
+ }
+ }
+
+ private static MCertificate GetClientCert(IBrokerInfo broker)
+ {
+ // if a client certificate is configured,
+ // use that to enable mutual authentication
+ MCertificate cert = null;
+ if ( broker.SslOptions != null
+ && broker.SslOptions.ClientCertificate != null )
+ {
+ cert = MCertificate.CreateFromX509Certificate(
+ broker.SslOptions.ClientCertificate
+ );
+ _logger.DebugFormat("Using Client Certificate for SSL '{0}'", cert.ToString(true));
+ }
+ return cert;
+ }
+
+ class MyTcpClient : SecureTcpClient
+ {
+ public MyTcpClient(string host, int port, SecurityOptions options)
+ : base(host, port, options)
+ {
+ }
+
+ public EndPoint LocalEndpoint
+ {
+ get { return Client.LocalEndPoint; }
+ }
+
+ }
+
+ }
+}
diff --git a/dotnet/Qpid.Client/Qpid.Client.csproj b/dotnet/Qpid.Client/Qpid.Client.csproj index 19d2180a09..659dba5ecc 100644 --- a/dotnet/Qpid.Client/Qpid.Client.csproj +++ b/dotnet/Qpid.Client/Qpid.Client.csproj @@ -32,6 +32,10 @@ <SpecificVersion>False</SpecificVersion>
<HintPath>..\Qpid.Common\lib\log4net\log4net.dll</HintPath>
</Reference>
+ <Reference Include="Org.Mentalis.Security, Version=1.0.13.716, Culture=neutral, PublicKeyToken=085a8f6006888436">
+ <SpecificVersion>False</SpecificVersion>
+ <HintPath>..\Qpid.Common\lib\seclib-1.0.0\Org.Mentalis.Security.dll</HintPath>
+ </Reference>
<Reference Include="System" />
<Reference Include="System.Data" />
<Reference Include="System.Xml" />
@@ -43,7 +47,10 @@ <Compile Include="Client\AMQDestination.cs" />
<Compile Include="Client\AmqChannel.cs" />
<Compile Include="Client\AMQAuthenticationException.cs" />
+ <Compile Include="Client\AMQNoConsumersException.cs" />
+ <Compile Include="Client\AMQNoRouteException.cs" />
<Compile Include="Client\Configuration\AuthenticationConfigurationSectionHandler.cs" />
+ <Compile Include="Client\SslOptions.cs" />
<Compile Include="Client\Message\QpidHeaders.cs" />
<Compile Include="Client\QpidConnectionInfo.cs" />
<Compile Include="Client\BasicMessageConsumer.cs" />
@@ -97,14 +104,18 @@ <Compile Include="Client\State\StateWaiter.cs" />
<Compile Include="Client\Transport\AmqpChannel.cs" />
<Compile Include="Client\Transport\AMQProtocolProvider.cs" />
+ <Compile Include="Client\Transport\IStreamFilter.cs" />
+ <Compile Include="Client\Transport\IoHandler.cs" />
<Compile Include="Client\Transport\IByteChannel.cs" />
<Compile Include="Client\Transport\IProtocolChannel.cs" />
<Compile Include="Client\Transport\IProtocolWriter.cs" />
<Compile Include="Client\Transport\ITransport.cs" />
<Compile Include="Client\Transport\SingleProtocolEncoderOutput.cs" />
- <Compile Include="Client\Transport\Socket\Blocking\BlockingSocketProcessor.cs" />
<Compile Include="Client\Transport\Socket\Blocking\BlockingSocketTransport.cs" />
<Compile Include="Client\Transport\Socket\Blocking\ByteChannel.cs" />
+ <Compile Include="Client\Transport\Socket\Blocking\SslSocketConnector.cs" />
+ <Compile Include="Client\Transport\Socket\Blocking\SocketConnector.cs" />
+ <Compile Include="Client\Transport\Socket\Blocking\ISocketConnector.cs" />
<Compile Include="Properties\AssemblyInfo.cs" />
<Compile Include="qms\BrokerInfo.cs" />
<Compile Include="qms\ConnectionInfo.cs" />
@@ -144,4 +155,4 @@ <Target Name="AfterBuild">
</Target>
-->
-</Project>
+</Project>
\ No newline at end of file diff --git a/dotnet/Qpid.Client/qms/BrokerInfo.cs b/dotnet/Qpid.Client/qms/BrokerInfo.cs index 69cd1bdbf7..3097e01149 100644 --- a/dotnet/Qpid.Client/qms/BrokerInfo.cs +++ b/dotnet/Qpid.Client/qms/BrokerInfo.cs @@ -24,7 +24,7 @@ namespace Qpid.Client.Qms { /// <summary> /// Know URL option names. - /// <seealso cref="ConnectionInfo"/> + /// <seealso cref="IConnectionInfo"/> /// </summary> public class BrokerInfoConstants { @@ -47,6 +47,7 @@ namespace Qpid.Client.Qms string Transport { get; set; } bool UseSSL { get; set; } long Timeout { get; set; } + SslOptions SslOptions { get; } String GetOption(string key); void SetOption(string key, string value); diff --git a/dotnet/Qpid.Common/AMQInvalidArgumentException.cs b/dotnet/Qpid.Common/AMQInvalidArgumentException.cs new file mode 100644 index 0000000000..2220c3152a --- /dev/null +++ b/dotnet/Qpid.Common/AMQInvalidArgumentException.cs @@ -0,0 +1,46 @@ +/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+using System;
+using System.Runtime.Serialization;
+
+using Qpid.Protocol;
+
+namespace Qpid
+{
+ /// <summary>
+ /// Thrown when an invalid argument was supplied to the broker
+ /// </summary>
+ [Serializable]
+ public class AMQInvalidArgumentException : AMQException
+ {
+ public AMQInvalidArgumentException(string message)
+ : base(AMQConstant.INVALID_ARGUMENT.Code, message, null)
+ {
+ }
+
+ protected AMQInvalidArgumentException(SerializationInfo info, StreamingContext ctxt)
+ : base(info, ctxt)
+ {
+ }
+
+ }
+}
diff --git a/dotnet/Qpid.Common/AMQInvalidRoutingKeyException.cs b/dotnet/Qpid.Common/AMQInvalidRoutingKeyException.cs new file mode 100644 index 0000000000..962211de04 --- /dev/null +++ b/dotnet/Qpid.Common/AMQInvalidRoutingKeyException.cs @@ -0,0 +1,46 @@ +/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+using System;
+using System.Runtime.Serialization;
+
+using Qpid.Protocol;
+
+namespace Qpid
+{
+ /// <summary>
+ /// Thrown when an invalid routing key was sent to the broker
+ /// </summary>
+ [Serializable]
+ public class AMQInvalidRoutingKeyException : AMQException
+ {
+ public AMQInvalidRoutingKeyException(string message)
+ : base(AMQConstant.INVALID_ROUTING_KEY.Code, message, null)
+ {
+ }
+
+ protected AMQInvalidRoutingKeyException(SerializationInfo info, StreamingContext ctxt)
+ : base(info, ctxt)
+ {
+ }
+
+ }
+}
diff --git a/dotnet/Qpid.Common/Protocol/AMQConstant.cs b/dotnet/Qpid.Common/Protocol/AMQConstant.cs index 560ac97122..04b4011567 100644 --- a/dotnet/Qpid.Common/Protocol/AMQConstant.cs +++ b/dotnet/Qpid.Common/Protocol/AMQConstant.cs @@ -77,11 +77,14 @@ namespace Qpid.Protocol public static readonly AMQConstant NO_ROUTE = new AMQConstant(312, "no route", true);
public static readonly AMQConstant NO_CONSUMERS = new AMQConstant(313, "no consumers", true);
public static readonly AMQConstant CONTEXT_IN_USE = new AMQConstant(320, "context in use", true);
- public static readonly AMQConstant CONTEXT_UNKNOWN = new AMQConstant(321, "context unknown", true);
- public static readonly AMQConstant INVALID_SELECTOR = new AMQConstant(322, "selector invalid", true);
public static readonly AMQConstant INVALID_PATH = new AMQConstant(402, "invalid path", true);
public static readonly AMQConstant ACCESS_REFUSED = new AMQConstant(403, "access refused", true);
public static readonly AMQConstant NOT_FOUND = new AMQConstant(404, "not found", true);
+ public static readonly AMQConstant ALREADY_EXISTS = new AMQConstant(405, "already exists", true);
+ public static readonly AMQConstant IN_USE = new AMQConstant(406, "in use", true);
+ public static readonly AMQConstant INVALID_ROUTING_KEY = new AMQConstant(407, "routing key invalid", true);
+ public static readonly AMQConstant REQUEST_TIMEOUT = new AMQConstant(408, "request timeout", true);
+ public static readonly AMQConstant INVALID_ARGUMENT = new AMQConstant(409, "argument invalid", true);
public static readonly AMQConstant FRAME_ERROR = new AMQConstant(501, "frame error", true);
public static readonly AMQConstant SYNTAX_ERROR = new AMQConstant(502, "syntax error", true);
public static readonly AMQConstant COMMAND_INVALID = new AMQConstant(503, "command invalid", true);
diff --git a/dotnet/Qpid.Common/Qpid.Common.csproj b/dotnet/Qpid.Common/Qpid.Common.csproj index ec85567a25..944fe24677 100644 --- a/dotnet/Qpid.Common/Qpid.Common.csproj +++ b/dotnet/Qpid.Common/Qpid.Common.csproj @@ -44,6 +44,8 @@ <Compile Include="AMQConnectionClosedException.cs" />
<Compile Include="AMQDisconnectedException.cs" />
<Compile Include="AMQException.cs" />
+ <Compile Include="AMQInvalidArgumentException.cs" />
+ <Compile Include="AMQInvalidRoutingKeyException.cs" />
<Compile Include="AMQUndeliveredException.cs" />
<Compile Include="AssemblySettings.cs" />
<Compile Include="Collections\LinkedHashtable.cs" />
@@ -208,4 +210,4 @@ <Target Name="AfterBuild">
</Target>
-->
-</Project>
+</Project>
\ No newline at end of file diff --git a/dotnet/Qpid.Common/lib/seclib-1.0.0/Org.Mentalis.Security.dll b/dotnet/Qpid.Common/lib/seclib-1.0.0/Org.Mentalis.Security.dll Binary files differnew file mode 100644 index 0000000000..c3b95d71ba --- /dev/null +++ b/dotnet/Qpid.Common/lib/seclib-1.0.0/Org.Mentalis.Security.dll diff --git a/dotnet/Qpid.Common/lib/seclib-1.0.0/seclib-license.txt b/dotnet/Qpid.Common/lib/seclib-1.0.0/seclib-license.txt new file mode 100644 index 0000000000..e837183135 --- /dev/null +++ b/dotnet/Qpid.Common/lib/seclib-1.0.0/seclib-license.txt @@ -0,0 +1,13 @@ +Source Code License
+
+Copyright © 2002-2006, The Mentalis.org Team
+All rights reserved.
+http://www.mentalis.org/
+
+Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met:
+
+- Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer.
+
+- Neither the name of the Mentalis.org Team, nor the names of its contributors may be used to endorse or promote products derived from this software without specific prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
diff --git a/dotnet/Qpid.Sasl.Tests/Mechanisms/ExternalSaslClientTests.cs b/dotnet/Qpid.Sasl.Tests/Mechanisms/ExternalSaslClientTests.cs new file mode 100644 index 0000000000..7b6ced9f03 --- /dev/null +++ b/dotnet/Qpid.Sasl.Tests/Mechanisms/ExternalSaslClientTests.cs @@ -0,0 +1,72 @@ +/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+using System;
+using System.Collections;
+using System.Text;
+
+using NUnit.Framework;
+using Qpid.Sasl;
+using Qpid.Sasl.Mechanisms;
+
+namespace Qpid.Sasl.Tests.Mechanisms
+{
+ [TestFixture]
+ public class ExternalSaslClientTests : ISaslCallbackHandler
+ {
+ private const string AUTHID = "nobody@nowhere.com";
+
+ [Test]
+ public void ReturnsRightMechanismName()
+ {
+ ISaslClient client = new ExternalSaslClient(AUTHID, new Hashtable(), this);
+
+ Assert.AreEqual("EXTERNAL", client.MechanismName);
+ }
+
+ [Test]
+ public void HasInitialResponseReturnsTrue()
+ {
+ ISaslClient client = new ExternalSaslClient(AUTHID, new Hashtable(), this);
+
+ Assert.IsTrue(client.HasInitialResponse);
+ }
+
+ [Test]
+ public void CanEvaluateChallenge()
+ {
+ Hashtable props = new Hashtable();
+ ISaslClient client = new ExternalSaslClient(AUTHID, new Hashtable(), this);
+
+ Assert.IsFalse(client.IsComplete);
+ byte[] response = client.EvaluateChallenge(new byte[0]);
+ Assert.AreEqual(AUTHID, Encoding.UTF8.GetString(response));
+
+ Assert.IsTrue(client.IsComplete);
+ }
+
+ void ISaslCallbackHandler.Handle(ISaslCallback[] callbacks)
+ {
+ }
+
+ } // class AnonymousSaslClientTests
+
+} // namespace Qpid.Sasl.Tests.Mechanisms
diff --git a/dotnet/Qpid.Sasl.Tests/Qpid.Sasl.Tests.csproj b/dotnet/Qpid.Sasl.Tests/Qpid.Sasl.Tests.csproj index 50c2cfba6f..374db4b490 100644 --- a/dotnet/Qpid.Sasl.Tests/Qpid.Sasl.Tests.csproj +++ b/dotnet/Qpid.Sasl.Tests/Qpid.Sasl.Tests.csproj @@ -38,6 +38,7 @@ <Reference Include="System.Xml" />
</ItemGroup>
<ItemGroup>
+ <Compile Include="Mechanisms\ExternalSaslClientTests.cs" />
<Compile Include="TestClientFactory.cs" />
<Compile Include="Mechanisms\AnonymousSaslClientTests.cs" />
<Compile Include="Mechanisms\DigestSaslClientTests.cs" />
@@ -63,4 +64,4 @@ <Target Name="AfterBuild">
</Target>
-->
-</Project>
+</Project>
\ No newline at end of file diff --git a/dotnet/Qpid.Sasl.Tests/SaslTests.cs b/dotnet/Qpid.Sasl.Tests/SaslTests.cs index 01bb676fb6..52e07d0041 100644 --- a/dotnet/Qpid.Sasl.Tests/SaslTests.cs +++ b/dotnet/Qpid.Sasl.Tests/SaslTests.cs @@ -78,6 +78,17 @@ namespace Qpid.Sasl.Tests }
[Test]
+ public void CanCreateExternal()
+ {
+ Hashtable props = new Hashtable();
+ string[] mechanisms = new string[] { "EXTERNAL", "OTHER" };
+ ISaslClient client = Sasl.CreateClient(mechanisms, "", "", "", props, this);
+
+ Assert.IsNotNull(client);
+ Assert.IsInstanceOfType(typeof(ExternalSaslClient), client);
+ }
+
+ [Test]
public void ReturnsNullIfNoFactoryFound()
{
Hashtable props = new Hashtable();
diff --git a/dotnet/Qpid.Sasl/DefaultClientFactory.cs b/dotnet/Qpid.Sasl/DefaultClientFactory.cs index 43f0470a21..44df9c7ee2 100644 --- a/dotnet/Qpid.Sasl/DefaultClientFactory.cs +++ b/dotnet/Qpid.Sasl/DefaultClientFactory.cs @@ -21,7 +21,6 @@ using System;
using System.Collections;
-using System.Text;
using Qpid.Sasl.Mechanisms;
@@ -29,11 +28,12 @@ namespace Qpid.Sasl {
public class DefaultClientFactory : ISaslClientFactory
{
- private static readonly string[] SUPPORTED = new string[] {
+ private static readonly string[] SUPPORTED = new string[] {
DigestSaslClient.Mechanism,
CramMD5SaslClient.Mechanism,
PlainSaslClient.Mechanism,
AnonymousSaslClient.Mechanism,
+ ExternalSaslClient.Mechanism,
};
public string[] GetSupportedMechanisms(IDictionary props)
@@ -52,6 +52,7 @@ namespace Qpid.Sasl vetoed.Add(CramMD5SaslClient.Mechanism);
vetoed.Add(PlainSaslClient.Mechanism);
vetoed.Add(AnonymousSaslClient.Mechanism);
+ vetoed.Add(ExternalSaslClient.Mechanism);
}
if ( props.Contains(SaslProperties.PolicyNoAnonymous) )
{
@@ -85,6 +86,8 @@ namespace Qpid.Sasl return new AnonymousSaslClient(authorizationId, props, handler);
case DigestSaslClient.Mechanism:
return new DigestSaslClient(authorizationId, serverName, protocol, props, handler);
+ case ExternalSaslClient.Mechanism:
+ return new ExternalSaslClient(authorizationId, props, handler);
}
}
// unknown mechanism
diff --git a/dotnet/Qpid.Sasl/Mechanisms/ExternalSaslClient.cs b/dotnet/Qpid.Sasl/Mechanisms/ExternalSaslClient.cs new file mode 100644 index 0000000000..127591e2da --- /dev/null +++ b/dotnet/Qpid.Sasl/Mechanisms/ExternalSaslClient.cs @@ -0,0 +1,69 @@ +/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+using System;
+using System.Collections;
+using System.Text;
+
+namespace Qpid.Sasl.Mechanisms
+{
+ /// <summary>
+ /// Implements the EXTERNAL authentication mechanism
+ /// as outlined in RFC 2222
+ /// </summary>
+ public class ExternalSaslClient : SaslClient
+ {
+ public const string Mechanism = "EXTERNAL";
+
+ public ExternalSaslClient(
+ string authid, IDictionary properties,
+ ISaslCallbackHandler handler)
+ : base(authid, null, null, properties, handler)
+ {
+ }
+
+ #region ISaslClient Implementation
+ //
+ // ISaslClient Implementation
+ //
+
+ public override string MechanismName
+ {
+ get { return Mechanism; }
+ }
+
+ public override bool HasInitialResponse
+ {
+ get { return true; }
+ }
+
+ public override byte[] EvaluateChallenge(byte[] challenge)
+ {
+ // ignore challenge
+ SetComplete();
+ return Encoding.UTF8.GetBytes(AuthorizationId);
+ }
+
+ #endregion // ISaslClient Implementation
+
+ } // class ExternalSaslClient
+
+} // namespace Qpid.Sasl.Mechanisms
diff --git a/dotnet/Qpid.Sasl/Qpid.Sasl.csproj b/dotnet/Qpid.Sasl/Qpid.Sasl.csproj index fa7e91811f..41784a8413 100644 --- a/dotnet/Qpid.Sasl/Qpid.Sasl.csproj +++ b/dotnet/Qpid.Sasl/Qpid.Sasl.csproj @@ -38,6 +38,7 @@ <Compile Include="Configuration\SaslConfiguration.cs" />
<Compile Include="Configuration\SaslConfigurationSectionHandler.cs" />
<Compile Include="MD5HMAC.cs" />
+ <Compile Include="Mechanisms\ExternalSaslClient.cs" />
<Compile Include="SaslException.cs" />
<Compile Include="Mechanisms\AnonymousSaslClient.cs" />
<Compile Include="Mechanisms\DigestSaslClient.cs" />
@@ -60,4 +61,4 @@ <Target Name="AfterBuild">
</Target>
-->
-</Project>
+</Project>
\ No newline at end of file |
